Collaborative filtering
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import spark.implicits._
val ds=spark.read.textFile("file:///opt/spark/data/mllib/als/sample_movielens_ratings.txt")
val df=ds.toDF
df.show(4)
/*
+-------------------+
| value|
+-------------------+
|0::2::3::1424380312|
|0::3::1::1424380312|
|0::5::2::1424380312|
|0::9::4::1424380312|
+-------------------+
only showing top 4 rows
*/
val df1=df.withColumn("_tmp", split($"value", "::")).select(
$"_tmp".getItem(0).as("col1"),
$"_tmp".getItem(1).as("col2"),
$"_tmp".getItem(2).as("col3"),
$"_tmp".getItem(3).as("col4")
).drop("_tmp")
df1.show(5)
/*
+----+----+----+----------+
|col1|col2|col3| col4|
+----+----+----+----------+
| 0| 2| 3|1424380312|
| 0| 3| 1|1424380312|
| 0| 5| 2|1424380312|
| 0| 9| 4|1424380312|
| 0| 11| 1|1424380312|
+----+----+----+----------+
only showing top 5 rows
*/
val df_with_datatype=df1.selectExpr("cast(col1 as Int) userId",
"cast(col2 as Int) movieId",
"cast(col3 as Float) rating",
"cast(col4 as Long) timestamp")
val df_filter_null=df_with_datatype.filter("userId is NOT null")
.filter("movieId is NOT null")
.filter("rating is NOT null")
.filter("timestamp is NOT null")
val Array(training, test) = df_filter_null.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
/*
Root-mean-square error = 1.9694419904333391
*/
movieRecs.toDF.show(5,false)
/*
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|31 |[[12, 3.9116766], [6, 3.2745547], [14, 3.123588], [8, 3.0202165], [7, 2.7595513], [9, 2.2341797], [17, 1.9346341], [21, 1.8659948], [15, 1.8418702], [25, 1.713342]] |
|85 |[[18, 5.2397203], [16, 4.9212985], [8, 4.5497913], [25, 4.4582], [7, 4.301504], [0, 4.2436285], [21, 3.2146235], [6, 3.1708229], [28, 3.1625175], [14, 2.8124433]] |
|65 |[[23, 4.797905], [18, 2.9681492], [0, 2.8323104], [14, 2.2709212], [5, 2.2471752], [11, 2.1821382], [25, 2.0922368], [15, 2.0393076], [1, 1.911113], [6, 1.8635347]] |
|53 |[[22, 6.043746], [8, 4.958887], [26, 4.9108295], [21, 4.8912897], [20, 3.9612412], [24, 3.9306047], [14, 2.8171947], [28, 2.5926101], [16, 2.2719743], [3, 2.2049422]]|
|78 |[[5, 1.380609], [25, 1.25078], [23, 1.1319652], [29, 1.1171162], [9, 1.0972868], [17, 1.0794944], [2, 1.0715297], [22, 1.055237], [4, 1.0450754], [26, 1.0372707]] |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows
*/
Last updated