MinHash is an LSH family for Jaccard distance where input features are sets of natural numbers. Jaccard distance of two sets is defined by the cardinality of their intersection and union:
MinHash applies a random hash function g to each element in the set and take the minimum of all hashed values:
The input sets for MinHash are represented as binary vectors, where the vector indices represent the elements themselves and the non-zero values in the vector represent the presence of that element in the set. While both dense and sparse vectors are supported, typically sparse vectors are recommended for efficiency. For example, Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. All non-zero values are treated as binary "1" values.
Note: Empty sets cannot be transformed by MinHash, which means any input vector must have at least 1 non-zero entry.
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
/*
Output:
The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id| features| hashes|
+---+--------------------+--------------------+
| 0|(6,[0,1,2],[1.0,1...|[[2.25592966E8], ...|
| 1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...|
| 2|(6,[0,2,4],[1.0,1...|[[2.25592966E8], ...|
+---+--------------------+--------------------+
Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
| 1| 4| 0.5|
| 0| 5| 0.5|
| 1| 5| 0.5|
| 2| 5| 0.5|
+---+---+---------------+
Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+-------+
| id| features| hashes|distCol|
+---+--------------------+--------------------+-------+
| 1|(6,[2,3,4],[1.0,1...|[[2.25592966E8], ...| 0.75|
+---+--------------------+--------------------+-------+
*/