代码之家  ›  专栏  ›  技术社区  ›  hoyland

Typesafe连接与Spark数据集的安全性低于我的预期

  •  0
  • hoyland  · 技术社区  · 6 年前

    在试图制造这个 solution Perform a typed join in Scala with Spark Datasets 我无意中碰到了一些我不明白的事情。

    在下面的测试中,签名 innerJoin def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) f: Foo => String g: Bar => Int . 我希望在编译时出现错误,但它编译得很好。为什么?

    java.lang.ClassNotFoundException: scala.Any 当Spark试图创建产品编码器时 ((K, Foo),(K, Bar)) Any 显示为 Int String .

    import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
    import org.scalatest.Matchers
    import org.scalatest.testng.TestNGSuite
    import org.testng.annotations.Test
    
    case class Foo(a: String)
    
    case class Bar(b: Int)
    
    class JoinTest extends TestNGSuite with Matchers {
      import JoinTest._
    
      @Test
      def testJoin(): Unit = {
        val spark = SparkSession.builder()
          .master("local")
          .appName("test").getOrCreate()
    
        import spark.implicits._
    
        val ds1 = spark.createDataset(Seq(Foo("a")))
        val ds2 = spark.createDataset(Seq(Bar(123)))
    
        val jd = ds1.innerJoin(ds2)(_.a, _.b)
    
        jd.count shouldBe 0
      }
     }
    
    object JoinTest {
      implicit class Joins[T](ds1: Dataset[T]) {
        def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)
         (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
         {
           val ds1_ = ds1.map(x => (f(x), x))
           val ds2_ = ds2.map(x => (g(x), x))
           ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
        }
       }
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Joe K    6 年前

    你说得对 Any String Int K . Function 在输出类型中是协变的。所以 Foo => String Foo => Any .

    解决这类问题的常用方法是使用两个类型参数和一个隐式 =:=

    def innerJoin[U, K1, K2](ds2: Dataset[U])(f: T => K1, g: U => K2)
      (implicit eq: K1 =:= K2, e1: Encoder[(K2, T)], e2: Encoder[(K2, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
      {
        val ds1_ = ds1.map(x => (eq(f(x)), x))
        ... rest the same as before ...