在试图制造这个 solution 到 Perform a typed join in Scala with Spark Datasets 我无意中碰到了一些我不明白的事情。
在下面的测试中,签名 innerJoin 是 def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) f: Foo => String 和 g: Bar => Int . 我希望在编译时出现错误,但它编译得很好。为什么?
innerJoin
def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)])
f: Foo => String
g: Bar => Int
java.lang.ClassNotFoundException: scala.Any 当Spark试图创建产品编码器时 ((K, Foo),(K, Bar)) Any 显示为 Int 和 String .
java.lang.ClassNotFoundException: scala.Any
((K, Foo),(K, Bar))
Any
Int
String
import org.apache.spark.sql.{Dataset, Encoder, SparkSession} import org.scalatest.Matchers import org.scalatest.testng.TestNGSuite import org.testng.annotations.Test case class Foo(a: String) case class Bar(b: Int) class JoinTest extends TestNGSuite with Matchers { import JoinTest._ @Test def testJoin(): Unit = { val spark = SparkSession.builder() .master("local") .appName("test").getOrCreate() import spark.implicits._ val ds1 = spark.createDataset(Seq(Foo("a"))) val ds2 = spark.createDataset(Seq(Bar(123))) val jd = ds1.innerJoin(ds2)(_.a, _.b) jd.count shouldBe 0 } } object JoinTest { implicit class Joins[T](ds1: Dataset[T]) { def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K) (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = { val ds1_ = ds1.map(x => (f(x), x)) val ds2_ = ds2.map(x => (g(x), x)) ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2)) } } }
你说得对 Any String 和 Int K . Function 在输出类型中是协变的。所以 Foo => String Foo => Any .
K
Function
Foo => String
Foo => Any
解决这类问题的常用方法是使用两个类型参数和一个隐式 =:=
=:=
def innerJoin[U, K1, K2](ds2: Dataset[U])(f: T => K1, g: U => K2) (implicit eq: K1 =:= K2, e1: Encoder[(K2, T)], e2: Encoder[(K2, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = { val ds1_ = ds1.map(x => (eq(f(x)), x)) ... rest the same as before ...