In this article, we will explore individual machine learning models in detail, starting with recommendation engines.

*(For more resources related to this topic, see here.)*

Recommendation engines are probably among the best types of machine learning model known to the general public. Even if people do not know exactly what a recommendation engine is, they have most likely experienced one through the use of popular websites such as Amazon, Netflix, YouTube, Twitter, LinkedIn, and Facebook. Recommendations are a core part of all these businesses, and in some cases, they drive significant percentages of their revenue.

The idea behind recommendation engines is to predict what people might like and to uncover relationships between items to aid in the discovery process (in this way, it is similar and, in fact, often complementary to search engines, which also play a role in discovery). However, unlike search engines, recommendation engines try to present people with relevant content that they did not necessarily search for or that they might not even have heard of.

Typically, a recommendation engine tries to model the connections between users and some type of item. If we can do a good job of showing our users movies related to a given movie, we could aid in discovery and navigation on our site, again improving our users’ experience, engagement, and the relevance of our content to them.

However, recommendation engines are not limited to movies, books, or products. The techniques we will explore in this article can be applied to just about any user-to-item relationship as well as user-to-user connections, such as those found on social networks, allowing us to make recommendations such as people you may know or who to follow.

Recommendation engines are most effective in two general scenarios (which are not mutually exclusive). They are explained here:

**Large number of available options for users**: When there are a very large number of available items, it becomes increasingly difficult for the user to find something they want. Searching can help when the user knows what they are looking for, but often, the right item might be something previously unknown to them. In this case, being recommended relevant items, that the user may not already know about, can help them discover new items.**A significant degree of personal taste involved**: When personal taste plays a large role in selection, recommendation models, which often utilize a wisdom of the crowd approach, can be helpful in discovering items based on the behavior of others that have similar taste profiles.

In this article, we will:

- Introduce the various types of recommendation engines
- Build a recommendation model using data about user preferences
- Use the trained model to compute recommendations for a given user as well compute similar items for a given item (that is, related items)
- Apply standard evaluation metrics to the model that we created to measure how well it performs in terms of predictive capability

# Types of recommendation models

Recommender systems are widely studied, and there are many approaches used, but there are two that are probably most prevalent: content-based filtering and collaborative filtering. Recently, other approaches such as ranking models have also gained in popularity. In practice, many approaches are hybrids, incorporating elements of many different methods into a model or combination of models.

## Content-based filtering

Content-based methods try to use the content or attributes of an item, together with some notion of similarity between two pieces of content, to generate items similar to a given item. These attributes are often textual content (such as titles, names, tags, and other metadata attached to an item), or in the case of media, they could include other features of the item, such as attributes extracted from audio and video content.

In a similar manner, user recommendations can be generated based on attributes of users or user profiles, which are then matched to item attributes using the same measure of similarity. For example, a user can be represented by the combined attributes of the items they have interacted with. This becomes their user profile, which is then compared to item attributes to find items that match the user profile.

## Collaborative filtering

Collaborative filtering is a form of wisdom of the crowd approach where the set of preferences of many users with respect to items is used to generate estimated preferences of users for items with which they have not yet interacted. The idea behind this is the notion of similarity.

In a user-based approach, if two users have exhibited similar preferences (that is, patterns of interacting with the same items in broadly the same way), then we would assume that they are similar to each other in terms of taste. To generate recommendations for unknown items for a given user, we can use the known preferences of other users that exhibit similar behavior. We can do this by selecting a set of similar users and computing some form of combined score based on the items they have shown a preference for. The overall logic is that if others have tastes similar to a set of items, these items would tend to be good candidates for recommendation.

We can also take an item-based approach that computes some measure of similarity between items. This is usually based on the existing user-item preferences or ratings. Items that tend to be rated the same by similar users will be classed as similar under this approach. Once we have these similarities, we can represent a user in terms of the items they have interacted with and find items that are similar to these known items, which we can then recommend to the user. Again, a set of items similar to the known items is used to generate a combined score to estimate for an unknown item.

The user- and item-based approaches are usually referred to as nearest-neighbor models, since the estimated scores are computed based on the set of most similar users or items (that is, their neighbors).

Finally, there are many model-based methods that attempt to model the user-item preferences themselves so that new preferences can be estimated directly by applying the model to unknown user-item combinations.

### Matrix factorization

Since Spark’s recommendation models currently only include an implementation of matrix factorization, we will focus our attention on this class of models. This focus is with good reason; however, these types of models have consistently been shown to perform extremely well in collaborative filtering and were among the best models in well-known competitions such as the Netflix prize.

For more information on and a brief overview of the performance of the best algorithms for the Netflix prize, see http://techblog.netflix.com/2012/04/netflix-recommendations-beyond-5-stars.html.

#### Explicit matrix factorization

When we deal with data that consists of preferences of users that are provided by the users themselves, we refer to explicit preference data. This includes, for example, ratings, thumbs up, likes, and so on that are given by users to items.

We can take these ratings and form a two-dimensional matrix with users as rows and items as columns. Each entry represents a rating given by a user to a certain item. Since in most cases, each user has only interacted with a relatively small set of items, this matrix has only a few non-zero entries (that is, it is very sparse).

As a simple example, let’s assume that we have the following user ratings for a set of movies:

```
Tom, Star Wars, 5
Jane, Titanic, 4
Bill, Batman, 3
Jane, Star Wars, 2
Bill, Titanic, 3
```

We will form the following ratings matrix:

A simple movie-rating matrix

Matrix factorization (or matrix completion) attempts to directly model this user-item matrix by representing it as a product of two smaller matrices of lower dimension. Thus, it is a dimensionality-reduction technique. If we have **U** users and **I** items, then our user-item matrix is of dimension U x I and might look something like the one shown in the following diagram:

A sparse ratings matrix

If we want to find a lower dimension (low-rank) approximation to our user-item matrix with the dimension **k**, we would end up with two matrices: one for users of size U x k and one for items of size I x k. These are known as factor matrices. If we multiply these two factor matrices, we would reconstruct an approximate version of the original ratings matrix. Note that while the original ratings matrix is typically very sparse, each factor matrix is dense, as shown in the following diagram:

The user- and item-factor matrices

These models are often also called latent feature models, as we are trying to discover some form of hidden features (which are represented by the factor matrices) that account for the structure of behavior inherent in the user-item rating matrix. While the latent features or factors are not directly interpretable, they might, perhaps, represent things such as the tendency of a user to like movies from a certain director, genre, style, or group of actors, for example.

As we are directly modeling the user-item matrix, the prediction in these models is relatively straightforward: to compute a predicted rating for a user and item, we compute the vector dot product between the relevant row of the user-factor matrix (that is, the user’s factor vector) and the relevant row of the item-factor matrix (that is, the item’s factor vector).

This is illustrated with the highlighted vectors in the following diagram:

Computing recommendations from user- and item-factor vectors

To find out the similarity between two items, we can use the same measures of similarity as we would use in the nearest-neighbor models, except that we can use the factor vectors directly by computing the similarity between two item-factor vectors, as illustrated in the following diagram:

Computing similarity with item-factor vectors

The benefit of factorization models is the relative ease of computing recommendations once the model is created. However, for very large user and itemsets, this can become a challenge as it requires storage and computation across potentially many millions of user- and item-factor vectors. Another advantage, as mentioned earlier, is that they tend to offer very good performance.

Projects such as Oryx (https://github.com/OryxProject/oryx) and Prediction.io (https://github.com/PredictionIO/PredictionIO) focus on model serving for large-scale models, including recommenders based on matrix factorization.

On the down side, factorization models are relatively more complex to understand and interpret compared to nearest-neighbor models and are often more computationally intensive during the model’s training phase.

#### Implicit matrix factorization

So far, we have dealt with explicit preferences such as ratings. However, much of the preference data that we might be able to collect is implicit feedback, where the preferences between a user and item are not given to us, but are, instead, implied from the interactions they might have with an item. Examples include binary data (such as whether a user viewed a movie, whether they purchased a product, and so on) as well as count data (such as the number of times a user watched a movie).

There are many different approaches to deal with implicit data. MLlib implements a particular approach that treats the input rating matrix as two matrices: a binary preference matrix, **P**, and a matrix of confidence weights, **C**.

For example, let’s assume that the user-movie ratings we saw previously were, in fact, the number of times each user had viewed that movie. The two matrices would look something like ones shown in the following screenshot. Here, the matrix **P** informs us that a movie was viewed by a user, and the matrix **C** represents the confidence weighting, in the form of the view counts—generally, the more a user has watched a movie, the higher the confidence that they actually like it.

Representation of an implicit preference and confidence matrix

The implicit model still creates a user- and item-factor matrix. In this case, however, the matrix that the model is attempting to approximate is not the overall ratings matrix but the preference matrix P. If we compute a recommendation by calculating the dot product of a user- and item-factor vector, the score will not be an estimate of a rating directly. It will rather be an estimate of the preference of a user for an item (though not strictly between 0 and 1, these scores will generally be fairly close to a scale of 0 to 1).

#### Alternating least squares

**Alternating Least Squares** (**ALS**) is an optimization technique to solve matrix factorization problems; this technique is powerful, achieves good performance, and has proven to be relatively easy to implement in a parallel fashion. Hence, it is well suited for platforms such as Spark. At the time of writing this, it is the only recommendation model implemented in MLlib.

ALS works by iteratively solving a series of least squares regression problems. In each iteration, one of the user- or item-factor matrices is treated as fixed, while the other one is updated using the fixed factor and the rating data. Then, the factor matrix that was solved for is, in turn, treated as fixed, while the other one is updated. This process continues until the model has converged (or for a fixed number of iterations).

Spark’s documentation for collaborative filtering contains references to the papers that underlie the ALS algorithms implemented each component of explicit and implicit data. You can view the documentation at http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html.

# Extracting the right features from your data

In this section, we will use explicit rating data, without additional user or item metadata or other information related to the user-item interactions. Hence, the features that we need as inputs are simply the user IDs, movie IDs, and the ratings assigned to each user and movie pair.

## Extracting features from the MovieLens 100k dataset

Start the Spark shell in the Spark base directory, ensuring that you provide enough memory via the *–driver-memory* option:

`>./bin/spark-shell –driver-memory 4g`

In this example, we will use the same MovieLens dataset. Use the directory in which you placed the MovieLens 100k dataset as the input path in the following code.

First, let’s inspect the raw ratings dataset:

```
val rawData = sc.textFile("/PATH/ml-100k/u.data")
rawData.first()
```

You will see output similar to these lines of code:

```
14/03/30 11:42:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/30 11:42:41 WARN LoadSnappy: Snappy native library not loaded
14/03/30 11:42:41 INFO FileInputFormat: Total input paths to process : 1
14/03/30 11:42:41 INFO SparkContext: Starting job: first at
```:15
14/03/30 11:42:41 INFO DAGScheduler: Got job 0 (first at :15) with 1 output partitions (allowLocal=true)
14/03/30 11:42:41 INFO DAGScheduler: Final stage: Stage 0 (first at :15)
14/03/30 11:42:41 INFO DAGScheduler: Parents of final stage: List()
14/03/30 11:42:41 INFO DAGScheduler: Missing parents: List()
14/03/30 11:42:41 INFO DAGScheduler: Computing the requested partition locally
14/03/30 11:42:41 INFO HadoopRDD: Input split: file:/Users/Nick/workspace/datasets/ml-100k/u.data:0+1979173
14/03/30 11:42:41 INFO SparkContext: Job finished: first at :15, took 0.030533 s
res0: String = 196 242 3 881250949

Recall that this dataset consisted of the *user id*, *movie id*, *rating*, *timestamp* fields separated by a tab (*“t”*) character. We don’t need the time when the rating was made to train our model, so let’s simply extract the first three fields:

`val rawRatings = rawData.map(_.split("t").take(3))`

We will first split each record on the *“t”* character, which gives us an *Array[String]* array. We will then use Scala’s *take* function to keep only the first *3* elements of the array, which correspond to *user id*, *movie id*, and *rating*, respectively.

We can inspect the first record of our new RDD by calling *rawRatings.first()*, which collects just the first record of the RDD back to the driver program. This will result in the following output:

`14/03/30 12:24:00 INFO SparkContext: Starting job: first at `:21
14/03/30 12:24:00 INFO DAGScheduler: Got job 1 (first at :21) with 1 output partitions (allowLocal=true)
14/03/30 12:24:00 INFO DAGScheduler: Final stage: Stage 1 (first at :21)
14/03/30 12:24:00 INFO DAGScheduler: Parents of final stage: List()
14/03/30 12:24:00 INFO DAGScheduler: Missing parents: List()
14/03/30 12:24:00 INFO DAGScheduler: Computing the requested partition locally
14/03/30 12:24:00 INFO HadoopRDD: Input split: file:/Users/Nick/workspace/datasets/ml-100k/u.data:0+1979173
14/03/30 12:24:00 INFO SparkContext: Job finished: first at :21, took 0.00391 s
res6: Array[String] = Array(196, 242, 3)

We will use Spark’s MLlib library to train our model. Let’s take a look at what methods are available for us to use and what input is required. First, import the *ALS* model from MLlib:

`import org.apache.spark.mllib.recommendation.ALS`

On the console, we can inspect the available methods on the ALS object using tab completion. Type in *ALS.* (note the dot) and then press the *Tab* key. You should see the autocompletion of the methods:

```
ALS.
asInstanceOf isInstanceOf main toString train trainImplicit
```

The method we want to use is *train*. If we type *ALS.train* and hit *Enter*, we will get an error. However, this error will tell us what the method signature looks like:

```
ALS.train
```:12: error: ambiguous reference to overloaded definition,
both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
and method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
match expected type ?
ALS.train
^

So, we can see that at a minimum, we need to provide the input arguments, *ratings*, *rank*, and *iterations*. The second method also requires an argument called *lambda*. We’ll cover these three shortly, but let’s take a look at the *ratings* argument. First, let’s import the *Rating* class that it references and use a similar approach to find out what an instance of *Rating* requires, by typing in *Rating()* and hitting *Enter*:

```
import org.apache.spark.mllib.recommendation.Rating
Rating()
```:13: error: not enough arguments for method apply: (user: Int, product: Int, rating: Double)org.apache.spark.mllib.recommendation.Rating in object Rating.
Unspecified value parameters user, product, rating.
Rating()
^

As we can see from the preceding output, we need to provide the *ALS* model with an RDD that consists of *Rating* records. A *Rating* class, in turn, is just a wrapper around *user id*, *movie id* (called *product* here), and the actual *rating* arguments. We’ll create our rating dataset using the *map* method and transforming the array of IDs and ratings into a *Rating* object:

`val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }`

Notice that we need to use

toIntortoDoubleto convert the raw rating data (which was extracted asStringsfrom the text file) toIntorDoublenumeric inputs. Also, note the use of acasestatement that allows us to extract the relevant variable names and use them directly (this saves us from having to use something likeval user = ratings(0)).

For more on Scala case statements and pattern matching as used here, take a look at http://docs.scala-lang.org/tutorials/tour/pattern-matching.html.

We now have an *RDD[Rating]* that we can verify by calling:

```
ratings.first()
14/03/30 12:32:48 INFO SparkContext: Starting job: first at
```:24
14/03/30 12:32:48 INFO DAGScheduler: Got job 2 (first at :24) with 1 output partitions (allowLocal=true)
14/03/30 12:32:48 INFO DAGScheduler: Final stage: Stage 2 (first at :24)
14/03/30 12:32:48 INFO DAGScheduler: Parents of final stage: List()
14/03/30 12:32:48 INFO DAGScheduler: Missing parents: List()
14/03/30 12:32:48 INFO DAGScheduler: Computing the requested partition locally
14/03/30 12:32:48 INFO HadoopRDD: Input split: file:/Users/Nick/workspace/datasets/ml-100k/u.data:0+1979173
14/03/30 12:32:48 INFO SparkContext: Job finished: first at :24, took 0.003752 s
res8: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

# Training the recommendation model

Once we have extracted these simple features from our raw data, we are ready to proceed with model training; MLlib takes care of this for us. All we have to do is provide the correctly-parsed input RDD we just created as well as our chosen model parameters.

## Training a model on the MovieLens 100k dataset

We’re now ready to train our model! The other inputs required for our model are as follows:

*rank*: This refers to the number of factors in our ALS model, that is, the number of hidden features in our low-rank approximation matrices. Generally, the greater the number of factors, the better, but this has a direct impact on memory usage, both for computation and to store models for serving, particularly for large number of users or items. Hence, this is often a trade-off in real-world use cases. A rank in the range of 10 to 200 is usually reasonable.*iterations*: This refers to the number of iterations to run. While each iteration in*ALS*is guaranteed to decrease the reconstruction error of the ratings matrix,*ALS*models will converge to a reasonably good solution after relatively few iterations. So, we don’t need to run for too many iterations in most cases (around 10 is often a good default).*lambda*: This parameter controls the regularization of our model. Thus,*lambda*controls over fitting. The higher the value of*lambda*, the more is the regularization applied. What constitutes a sensible value is very dependent on the size, nature, and sparsity of the underlying data, and as with almost all machine learning models, the regularization parameter is something that should be tuned using out-of-sample test data and cross-validation approaches.

We’ll use *rank* of *50*, *10* iterations, and a lambda parameter of *0.01* to illustrate how to train our model:

`val model = ALS.train(ratings, 50, 10, 0.01)`

This returns a *MatrixFactorizationModel* object, which contains the user and item factors in the form of an RDD of *(id, factor)* pairs. These are called *userFeatures* and *productFeatures*, respectively. For example:

```
model.userFeatures
res14: org.apache.spark.rdd.RDD[(Int, Array[Double])] = FlatMappedRDD[659] at flatMap at ALS.scala:231
```

We can see that the factors are in the form of an *Array[Double]*.

Note that the operations used in MLlib’s *ALS* implementation are lazy transformations, so the actual computation will only be performed once we call some sort of action on the resulting *RDDs* of the user and item factors. We can force the computation using a Spark action such as *count*:

`model.userFeatures.count`

This will trigger the computation, and we will see a quite a bit of output text similar to the following lines of code:

`14/03/30 13:10:40 INFO SparkContext: Starting job: count at `:26
14/03/30 13:10:40 INFO DAGScheduler: Registering RDD 665 (map at ALS.scala:147)
14/03/30 13:10:40 INFO DAGScheduler: Registering RDD 664 (map at ALS.scala:146)
14/03/30 13:10:40 INFO DAGScheduler: Registering RDD 674 (mapPartitionsWithIndex at ALS.scala:164)
...
14/03/30 13:10:45 INFO SparkContext: Job finished: count at :26, took 5.068255 s
res16: Long = 943

If we call *count* for the movie factors, we will see the following output:

```
model.productFeatures.count
14/03/30 13:15:21 INFO SparkContext: Starting job: count at
```:26
14/03/30 13:15:21 INFO DAGScheduler: Got job 10 (count at :26) with 1 output partitions (allowLocal=false)
14/03/30 13:15:21 INFO DAGScheduler: Final stage: Stage 165 (count at :26)
14/03/30 13:15:21 INFO DAGScheduler: Parents of final stage: List(Stage 169, Stage 166)
14/03/30 13:15:21 INFO DAGScheduler: Missing parents: List()
14/03/30 13:15:21 INFO DAGScheduler: Submitting Stage 165 (FlatMappedRDD[883] at flatMap at ALS.scala:231), which has no missing parents
14/03/30 13:15:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 165 (FlatMappedRDD[883] at flatMap at ALS.scala:231)
...
14/03/30 13:15:21 INFO SparkContext: Job finished: count at :26, took 0.030044 s
res21: Long = 1682

As expected, we have a factor array for each user (*943* factors) and movie (*1682* factors).

### Training a model using implicit feedback data

The standard matrix factorization approach in MLlib deals with explicit ratings. To work with implicit data, you can use the *trainImplicit* method. It is called in a manner similar to the standard *train* method. There is an additional parameter, *alpha*, that can be set (and in the same way, the regularization parameter, *lambda*, should be selected via testing and cross-validation methods).

The *alpha* parameter controls the baseline level of confidence weighting applied. A higher level of *alpha* tends to make the model more confident about the fact that missing data equates to no preference for the relevant user-item pair.

As an exercise, try to take the existing MovieLens dataset and convert it into an implicit dataset. One possible approach is to convert it to binary feedback (0s and 1s) by applying a threshold on the ratings at some level.

Another approach could be to convert the ratings’ values into confidence weights (for example, perhaps, low ratings could imply zero weights, or even negative weights, which are supported by MLlib’s implementation).

Train a model on this dataset and compare the results of the following section with those generated by your implicit model.

# Using the recommendation model

Now that we have our trained model, we’re ready to use it to make predictions. These predictions typically take one of two forms: recommendations for a given user and related or similar items for a given item.

## User recommendations

In this case, we would like to generate recommended items for a given user. This usually takes the form of a *top-K* list, that is, the *K* items that our model predicts will have the highest probability of the user liking them. This is done by computing the predicted score for each item and ranking the list based on this score.

The exact method to perform this computation depends on the model involved. For example, in user-based approaches, the ratings of similar users on items are used to compute the recommendations for a user, while in an item-based approach, the computation is based on the similarity of items the user has rated to the candidate items.

In matrix factorization, because we are modeling the ratings matrix directly, the predicted score can be computed as the vector dot product between a user-factor vector and an item-factor vector.

### Generating movie recommendations from the MovieLens 100k dataset

As MLlib’s recommendation model is based on matrix factorization, we can use the factor matrices computed by our model to compute predicted scores (or ratings) for a user. We will focus on the explicit rating case using MovieLens data; however, the approach is the same when using the implicit model.

The *MatrixFactorizationModel* class has a convenient *predict* method that will compute a predicted score for a given user and item combination:

```
val predictedRating = model.predict(789, 123)
14/03/30 16:10:10 INFO SparkContext: Starting job: lookup at MatrixFactorizationModel.scala:45
14/03/30 16:10:10 INFO DAGScheduler: Got job 30 (lookup at MatrixFactorizationModel.scala:45) with 1 output partitions (allowLocal=false)
...
14/03/30 16:10:10 INFO SparkContext: Job finished: lookup at MatrixFactorizationModel.scala:46, took 0.023077 s
predictedRating: Double = 3.128545693368485
```

As we can see, this model predicts a rating of *3.12* for user *789* and movie *123*.

Note that you might see different results than those shown in this section because the

ALSmodel is initialized randomly. So, different runs of the model will lead to different solutions.

The *predict* method can also take an RDD of *(user, item)* IDs as the input and will generate predictions for each of these. We can use this method to make predictions for many users and items at the same time.

To generate the *top-K* recommended items for a user, *MatrixFactorizationModel* provides a convenience method called *recommendProducts*. This takes two arguments: *user* and *num*, where *user* is the user ID, and *num* is the number of items to recommend.

It returns the top *num* items ranked in the order of the predicted score. Here, the scores are computed as the dot product between the user-factor vector and each item-factor vector.

Let’s generate the top *10* recommended items for user *789*:

```
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
```

We now have a set of predicted ratings for each movie for user *789*. If we print this out, we could inspect the top 10 recommendations for this user:

`println(topKRecs.mkString("n"))`

You should see the following output on your console:

```
Rating(789,715,5.931851273771102)
Rating(789,12,5.582301095666215)
Rating(789,959,5.516272981542168)
Rating(789,42,5.458065302395629)
Rating(789,584,5.449949837103569)
Rating(789,750,5.348768847643657)
Rating(789,663,5.30832117499004)
Rating(789,134,5.278933936827717)
Rating(789,156,5.250959077906759)
Rating(789,432,5.169863417126231)
```

#### Inspecting the recommendations

We can give these recommendations a sense check by taking a quick look at the titles of the movies a user has rated and the recommended movies. First, we need to load the movie data. We’ll collect this data as a *Map[Int, String]* method mapping the movie ID to the title:

```
val movies = sc.textFile("/PATH/ml-100k/u.item")
val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles(123)
res68: String = Frighteners, The (1996)
```

For our user *789*, we can find out what movies they have rated, take the *10* movies with the highest rating, and then check the titles. We will do this now by first using the *keyBy* Spark function to create an RDD of key-value pairs from our *ratings* RDD, where the key will be the user ID. We will then use the *lookup* function to return just the ratings for this key (that is, that particular user ID) to the driver:

`val moviesForUser = ratings.keyBy(_.user).lookup(789)`

Let’s see how many movies this user has rated. This will be the *size* of the *moviesForUser* collection:

`println(moviesForUser.size)`

We will see that this user has rated *33* movies.

Next, we will take the 10 movies with the highest ratings by sorting the *moviesForUser* collection using the *rating* field of the *Rating* object. We will then extract the movie title for the relevant product ID attached to the *Rating* class from our mapping of movie titles and print out the top *10* titles with their ratings:

`moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)`

You will see the following output displayed:

```
(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)
```

Now, let’s take a look at the top 10 recommendations for this user and see what the titles are using the same approach as the one we used earlier (note that the recommendations are already sorted):

```
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
(To Die For (1995),5.931851273771102)
(Usual Suspects, The (1995),5.582301095666215)
(Dazed and Confused (1993),5.516272981542168)
(Clerks (1994),5.458065302395629)
(Secret Garden, The (1993),5.449949837103569)
(Amistad (1997),5.348768847643657)
(Being There (1979),5.30832117499004)
(Citizen Kane (1941),5.278933936827717)
(Reservoir Dogs (1992),5.250959077906759)
(Fantasia (1940),5.169863417126231)
```

We leave it to you to decide whether these recommendations make sense.

## Item recommendations

Item recommendations are about answering the following question: for a certain item, what are the items most similar to it? Here, the precise definition of similarity is dependent on the model involved. In most cases, similarity is computed by comparing the vector representation of two items using some similarity measure. Common similarity measures include Pearson correlation and cosine similarity for real-valued vectors and Jaccard similarity for binary vectors.

### Generating similar movies for the MovieLens 100K dataset

The current *MatrixFactorizationModel* API does not directly support item-to-item similarity computations. Therefore, we will need to create our own code to do this.

We will use the cosine similarity metric, and we will use the jblas linear algebra library (a dependency of MLlib) to compute the required vector dot products. This is similar to how the existing *predict* and *recommendProducts* methods work, except that we will use cosine similarity as opposed to just the dot product.

We would like to compare the factor vector of our chosen item with each of the other items, using our similarity metric. In order to perform linear algebra computations, we will first need to create a vector object out of the factor vectors, which are in the form of an *Array[Double]*. The *JBLAS* class, *DoubleMatrix*, takes an *Array[Double]* as the constructor argument as follows:

```
import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000]
```

Note that using jblas, vectors are represented as a one-dimensional

DoubleMatrixclass, while matrices are a two-dimensionalDoubleMatrixclass.

We will need a method to compute the cosine similarity between two vectors. Cosine similarity is a measure of the angle between two vectors in an *n*-dimensional space. It is computed by first calculating the dot product between the vectors and then dividing the result by a denominator, which is the norm (or length) of each vector multiplied together (specifically, the L2-norm is used in cosine similarity). In this way, cosine similarity is a normalized dot product.

The cosine similarity measure takes on values between -1 and 1. A value of 1 implies completely similar, while a value of 0 implies independence (that is, no similarity). This measure is useful because it also captures negative similarity, that is, a value of -1 implies that not only are the vectors not similar, but they are also completely dissimilar.

Let’s create our *cosineSimilarity* function here:

```
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
```

Note that we defined a return type for this function of

Double. We are not required to do this, since Scala features type inference. However, it can often be useful to document return types for Scala functions.

Let’s try it out on one of our item factors for item *567*. We will need to collect an item factor from our model; we will do this using the *lookup* method in a similar way that we did earlier to collect the ratings for a specific user. In the following lines of code, we also use the *head* function, since *lookup* returns an array of values, and we only need the first value (in fact, there will only be one value, which is the factor vector for this item).

Since this will be an *Array[Double]*, we will then need to create a *DoubleMatrix* object from it and compute the cosine similarity with itself:

```
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
cosineSimilarity(itemVector, itemVector)
```

A similarity metric should measure how close, in some sense, two vectors are to each other. Here, we can see that our cosine similarity metric tells us that this item vector is identical to itself, which is what we would expect:

`res113: Double = 1.0`

Now, we are ready to apply our similarity metric to each item:

```
val sims = model.productFeatures.map{ case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
```

Next, we can compute the top 10 most similar items by sorting out the similarity score for each item:

```
// recall we defined K = 10 earlier
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
```

In the preceding code snippet, we used Spark’s *top* function, which is an efficient way to compute *top-K* results in a distributed fashion, instead of using *collect* to return all the data to the driver and sorting it locally (remember that we could be dealing with millions of users and items in the case of recommendation models).

We need to tell Spark how to sort the *(item id, similarity score)* pairs in the *sims* RDD. To do this, we will pass an extra argument to *top*, which is a Scala *Ordering* object that tells Spark that it should sort by the value in the key-value pair (that is, sort by *similarity*).

Finally, we can print the 10 items with the highest computed similarity metric to our given item:

`println(sortedSims.take(10).mkString("n"))`

You will see output like the following one:

```
(567,1.0000000000000002)
(1471,0.6932331537649621)
(670,0.6898690594544726)
(201,0.6897964975027041)
(343,0.6891221044611473)
(563,0.6864214133620066)
(294,0.6812075443259535)
(413,0.6754663844488256)
(184,0.6702643811753909)
(109,0.6594872765176396)
```

Not surprisingly, we can see that the top-ranked similar item is our item. The rest are the other items in our set of items, ranked in order of our similarity metric.

#### Inspecting the similar items

Let’s see what the title of our chosen movie is:

```
println(titles(itemId))
Wes Craven's New Nightmare (1994)
```

As we did for user recommendations, we can sense check our item-to-item similarity computations and take a look at the titles of the most similar movies. This time, we will take the top 11 so that we can exclude our given movie. So, we will take the numbers 1 to 11 in the list:

```
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("n")
```

You will see the movie titles and scores displayed similar to this output:

```
(Hideaway (1995),0.6932331537649621)
(Body Snatchers (1993),0.6898690594544726)
(Evil Dead II (1987),0.6897964975027041)
(Alien: Resurrection (1997),0.6891221044611473)
(Stephen King's The Langoliers (1995),0.6864214133620066)
(Liar Liar (1997),0.6812075443259535)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.6754663844488256)
(Army of Darkness (1993),0.6702643811753909)
(Mystery Science Theater 3000: The Movie (1996),0.6594872765176396)
(Scream (1996),0.6538249646863378)
```

Once again note that you might see quite different results due to random model initialization.

Now that you have computed similar items using cosine similarity, see if you can do the same with the user-factor vectors to compute similar users for a given user.

# Evaluating the performance of recommendation models

How do we know whether the model we have trained is a good model? We need to be able to evaluate its predictive performance in some way. **Evaluation metrics** are measures of a model’s predictive capability or accuracy. Some are direct measures of how well a model predicts the model’s target variable (such as Mean Squared Error), while others are concerned with how well the model performs at predicting things that might not be directly optimized in the model but are often closer to what we care about in the real world (such as Mean average precision).

Evaluation metrics provide a standardized way of comparing the performance of the same model with different parameter settings and of comparing performance across different models. Using these metrics, we can perform model selection to choose the best-performing model from the set of models we wish to evaluate.

Here, we will show you how to calculate two common evaluation metrics used in recommender systems and collaborative filtering models: Mean Squared Error and Mean average precision at K.

## Mean Squared Error

The **Mean Squared Error** (**MSE**) is a direct measure of the reconstruction error of the user-item rating matrix. It is also the objective function being minimized in certain models, specifically many matrix-factorization techniques, including *ALS*. As such, it is commonly used in explicit ratings settings.

It is defined as the sum of the squared errors divided by the number of observations. The squared error, in turn, is the square of the difference between the predicted rating for a given user-item pair and the actual rating.

We will use our user *789* as an example. Let’s take the first rating for this user from the *moviesForUser* set of *Ratings* that we previously computed:

```
val actualRating = moviesForUser.take(1)(0)
actualRating: org.apache.spark.mllib.recommendation.Rating = Rating(789,1012,4.0)
```

We will see that the rating for this user-item combination is 4. Next, we will compute the model’s predicted rating:

```
val predictedRating = model.predict(789, actualRating.product)
...
14/04/13 13:01:15 INFO SparkContext: Job finished: lookup at MatrixFactorizationModel.scala:46, took 0.025404 s
predictedRating: Double = 4.001005374200248
```

We will see that the predicted rating is about 4, very close to the actual rating. Finally, we will compute the squared error between the actual rating and the predicted rating:

```
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
squaredError: Double = 1.010777282523947E-6
```

So, in order to compute the overall MSE for the dataset, we need to compute this squared error for each *(user, movie, actual rating, predicted rating)* entry, sum them up, and divide them by the number of ratings. We will do this in the following code snippet.

Note the following code is adapted from the Apache Spark programming guide for ALS at http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html.

First, we will extract the user and product IDs from the *ratings* RDD and make predictions for each user-item pair using *model.predict*. We will use the user-item pair as the key and the predicted rating as the value:

```
val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product)}
val predictions = model.predict(usersProducts).map{
case Rating(user, product, rating) => ((user, product), rating)
}
```

Next, we extract the actual ratings and also map the *ratings* RDD so that the user-item pair is the key and the actual rating is the value. Now that we have two RDDs with the same form of key, we can join them together to create a new RDD with the actual and predicted ratings for each user-item combination:

```
val ratingsAndPredictions = ratings.map{
case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
```

Finally, we will compute the MSE by summing up the squared errors using *reduce* and dividing by the *count* method of the number of records:

```
val MSE = ratingsAndPredictions.map{
case ((user, product), (actual, predicted)) => math.pow((actual - predicted), 2)
}.reduce(_ + _) / ratingsAndPredictions.count
println("Mean Squared Error = " + MSE)
Mean Squared Error = 0.08231947642632852
```

It is common to use the **Root Mean Squared Error** (**RMSE**), which is just the square root of the MSE metric. This is somewhat more interpretable, as it is in the same units as the underlying data (that is, the ratings in this case). It is equivalent to the standard deviation of the differences between the predicted and actual ratings. We can compute it simply as follows:

```
val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error = " + RMSE)
Root Mean Squared Error = 0.2869137090247319
```

## Mean average precision at K

**Mean average precision at K** (**MAPK**) is the mean of the **average precision at K** (**APK**) metric across all instances in the dataset. APK is a metric commonly used in information retrieval. APK is a measure of the average relevance scores of a set of the *top-K* documents presented in response to a query. For each query instance, we will compare the set of *top-K* results with the set of actual relevant documents (that is, a ground truth set of relevant documents for the query).

In the APK metric, the order of the result set matters, in that, the APK score would be higher if the result documents are both relevant and the relevant documents are presented higher in the results. It is, thus, a good metric for recommender systems in that typically we would compute the *top-K* recommended items for each user and present these to the user. Of course, we prefer models where the items with the highest predicted scores (which are presented at the top of the list of recommendations) are, in fact, the most relevant items for the user. APK and other ranking-based metrics are also more appropriate evaluation measures for implicit datasets; here, MSE makes less sense.

In order to evaluate our model, we can use APK, where each user is the equivalent of a query, and the set of *top-K* recommended items is the document result set. The relevant documents (that is, the ground truth) in this case, is the set of items that a user interacted with. Hence, APK attempts to measure how good our model is at predicting items that a user will find relevant and choose to interact with.

The code for the following average precision computation is based on https://github.com/benhamner/Metrics.

More information on MAPK can be found at https://www.kaggle.com/wiki/MeanAveragePrecision.

Our function to compute the APK is shown here:

```
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
val predK = predicted.take(k)
var score = 0.0
var numHits = 0.0
for ((p, i)
```

As you can see, this takes as input a list of *actual* item IDs that are associated with the user and another list of *predicted* ids so that our estimate will be relevant for the user.

We can compute the APK metric for our example user *789* as follows. First, we will extract the actual movie IDs for the user:

```
val actualMovies = moviesForUser.map(_.product)
actualMovies: Seq[Int] = ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)
```

We will then use the movie recommendations we made previously to compute the APK score using *K = 10*:

```
val predictedMovies = topKRecs.map(_.product)
predictedMovies: Array[Int] = Array(27, 497, 633, 827, 602, 849, 401, 584, 1035, 1014)
val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
apk10: Double = 0.0
```

In this case, we can see that our model is not doing a very good job of predicting relevant movies for this user as the APK score is 0.

In order to compute the APK for each user and average them to compute the overall MAPK, we will need to generate the list of recommendations for each user in our dataset. While this can be fairly intensive on a large scale, we can distribute the computation using our Spark functionality. However, one limitation is that each worker must have the full item-factor matrix available so that it can compute the dot product between the relevant user vector and all item vectors. This can be a problem when the number of items is extremely high as the item matrix must fit in the memory of one machine.

There is actually no easy way around this limitation. One possible approach is to only compute recommendations for a subset of items from the total item set, using approximate techniques such as Locality Sensitive Hashing (http://en.wikipedia.org/wiki/Locality-sensitive_hashing).

We will now see how to go about this. First, we will collect the item factors and form a *DoubleMatrix* object from them:

```
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
(1682,50)
```

This gives us a matrix with *1682* rows and *50* columns, as we would expect from *1682* movies with a factor dimension of *50*. Next, we will distribute the item matrix as a broadcast variable so that it is available on each worker node:

```
val imBroadcast = sc.broadcast(itemMatrix)
14/04/13 21:02:01 INFO MemoryStore: ensureFreeSpace(672960) called with curMem=4006896, maxMem=311387750
14/04/13 21:02:01 INFO MemoryStore: Block broadcast_21 stored as values to memory (estimated size 657.2 KB, free 292.5 MB)
imBroadcast: org.apache.spark.broadcast.Broadcast[org.jblas.DoubleMatrix] = Broadcast(21)
```

Now we are ready to compute the recommendations for each user. We will do this by applying a *map* function to each user factor within which we will perform a matrix multiplication between the user-factor vector and the movie-factor matrix. The result is a vector (of length *1682*, that is, the number of movies we have) with the predicted rating for each movie. We will then sort these predictions by the predicted rating:

```
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
(userId, recommendedIds)
}
allRecs: org.apache.spark.rdd.RDD[(Int, Seq[Int])] = MappedRDD[269] at map at
```:29

As we can see, we now have an RDD that contains a list of movie IDs for each user ID. These movie IDs are sorted in order of the estimated rating.

Note that we needed to add 1 to the returned movie ids (as highlighted in the preceding code snippet), as the item-factor matrix is 0-indexed, while our movie IDs start at

1.

We also need the list of movie IDs for each user to pass into our APK function as the *actual* argument. We already have the *ratings* RDD ready, so we can extract just the user and movie IDs from it.

If we use Spark’s *groupBy* operator, we will get an RDD that contains a list of *(userid, movieid)* pairs for each user ID (as the user ID is the key on which we perform the *groupBy* operation):

```
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at
```:21

Finally, we can use Spark’s *join* operator to join these two RDDs together on the user ID key. Then, for each user, we have the list of actual and predicted movie IDs that we can pass to our APK function. In a manner similar to how we computed MSE, we will sum each of these APK scores using a *reduce* action and divide by the number of users (that is, the count of the *allRecs* RDD):

```
val K = 10
val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, K)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision at K = " + MAPK)
Mean Average Precision at K = 0.030486963254725705
```

Our model achieves a fairly low MAPK. However, note that typical values for recommendation tasks are usually relatively low, especially if the item set is extremely large.

Try out a few parameter settings for *lambda* and *rank *(and *alpha* if you are using the implicit version of ALS) and see whether you can find a model that performs better based on the RMSE and MAPK evaluation metrics.

## Using MLlib’s built-in evaluation functions

While we have computed MSE, RMSE, and MAPK from scratch, and it a useful learning exercise to do so, MLlib provides convenience functions to do this for us in the *RegressionMetrics* and *RankingMetrics* classes.

### RMSE and MSE

First, we will compute the MSE and RMSE metrics using *RegressionMetrics*. We will instantiate a *RegressionMetrics* instance by passing in an RDD of key-value pairs that represent the predicted and true values for each data point, as shown in the following code snippet. Here, we will again use the *ratingsAndPredictions* RDD we computed in our earlier example:

```
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (predicted, actual)) => (predicted, actual) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
```

We can then access various metrics, including MSE and RMSE. We will print out these metrics here:

```
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
```

You will see that the output for MSE and RMSE is exactly the same as the metrics we computed earlier:

```
Mean Squared Error = 0.08231947642632852
Root Mean Squared Error = 0.2869137090247319
```

### MAP

As we did for MSE and RMSE, we can compute ranking-based evaluation metrics using MLlib’s *RankingMetrics* class. Similarly, to our own average precision function, we need to pass in an RDD of key-value pairs, where the key is an *Array* of predicted item IDs for a user, while the value is an array of actual item IDs.

The implementation of the average precision at the K function in *RankingMetrics* is slightly different from ours, so we will get different results. However, the computation of the overall mean average precision (MAP, which does not use a threshold at K) is the same as our function if we select *K* to be very high (say, at least as high as the number of items in our item set):

First, we will calculate MAP using *RankingMetrics*:

```
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
```

You will see the following output:

`Mean Average Precision = 0.07171412913757183`

Next, we will use our function to compute the MAP in exactly the same way as we did previously, except that we set *K* to a very high value, say *2000*:

```
val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision = " + MAPK2000)
```

You will see that the MAP from our own function is the same as the one computed using *RankingMetrics*:

`Mean Average Precision = 0.07171412913757186`

We will not cover cross validation in this article. However, note that the same techniques for cross-validation can be used to evaluate recommendation models, using the performance metrics such as MSE, RMSE, and MAP, which we covered in this section.

# Summary

In this article, we used Spark’s MLlib library to train a collaborative filtering recommendation model, and you learned how to use this model to make predictions for the items that a given user might have a preference for. We also used our model to find items that are similar or related to a given item. Finally, we explored common metrics to evaluate the predictive capability of our recommendation model.

To learn more about Spark, the following books published by Packt Publishing (https://www.packtpub.com/) are recommended:

- Fast Data Processing with Spark – Second Edition (https://www.packtpub.com/big-data-and-business-intelligence/fast-data-processing-spark-second-edition)
- Spark Cookbook (https://www.packtpub.com/big-data-and-business-intelligence/spark-cookbook)

# Resources for Article:

**Further resources on this subject:**

- Reactive Programming And The Flux Architecture [article]
- Spark – Architecture And First Program [article]
- The Design Patterns Out There And Setting Up Your Environment [article]