代码之家  ›  专栏  ›  技术社区  ›  MaatDeamon

Spark结构化流存储流+行+编码器问题

  •  2
  • MaatDeamon  · 技术社区  · 6 年前

    我正试着用spark结构化流媒体在我的本地机器上运行一些测试。

    在批处理模式下,下面是我要处理的行:

    val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
    val rows         = List(
        Row(
          Map("ID" -> "1",
            "STRUCTUREID" -> "MFCD00869853",
            "MOLFILE" -> "The MOL Data",
            "MOLWEIGHT" -> "803.482",
            "FORMULA" -> "C44H69NO12",
            "NAME" -> "Tacrolimus",
            "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
            "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
            "METABOLISM" -> "The metabolism 500"
           )),
        Row(
          Map("ID" -> "2",
            "STRUCTUREID" -> "MFCD00869854",
            "MOLFILE" -> "The MOL Data",
            "MOLWEIGHT" -> "603.482",
            "FORMULA" -> "",
            "NAME" -> "Tacrolimus2",
            "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
            "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
            "METABOLISM" -> "The metabolism 500"
          ))
      )
    val df  = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)
    

    现在我尝试在流模式下使用MemoryStream进行测试。我添加了以下内容:

    implicit val ctx = spark.sqlContext
    val intsInput = MemoryStream[Row]
    

    但是编译器抱怨如下:

    找不到参数证据$1的隐式:编码器[行]

    我还看到,如果添加以下导入,则错误将消失:

    导入spark.implicits_

    实际上,我现在得到了以下警告而不是一个错误

    我不太了解编码器机制,如果有人能解释给我如何不使用这些隐含的话,我会很感激的。原因是,在一本书中,当涉及到从行创建数据帧时,我红色了下面的内容。

    val myManualSchema = new StructType(Array(
      new StructField("some", StringType, true),
      new StructField("col", StringType, true),
      new StructField("names", LongType, false)))
    val myRows = Seq(Row("Hello", null, 1L))
    val myRDD = spark.sparkContext.parallelize(myRows)
    val myDf = spark.createDataFrame(myRDD, myManualSchema)
    myDf.show()
    

    然后作者接着说:

    在Scala中,我们还可以利用 序列类型。这对空类型不太好,所以 必须推荐用于生产用例。

    val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
    

    如果有人在我使用隐式时能花时间解释我的场景中发生的事情,如果这样做是比较安全的,或者有没有一种方法可以更显式地做而不导入隐式。

    最后,如果有人能给我介绍一个关于编码器和火花类型映射的好文档,那就太好了。

    编辑1

      implicit val ctx = spark.sqlContext
      import spark.implicits._
      val rows = MemoryStream[Map[String,String]]
      val df = rows.toDF()
    

    尽管我的问题是我对自己的工作没有信心。在我看来,在某些情况下,我需要创建一个数据集,以便能够使用to DF转换在DF[ROW]中转换它。我知道使用DS是类型安全的,但比DF慢。那么,为什么要使用数据集这个中介呢?这不是我第一次在Spark结构化流媒体中看到这种情况。如果有人能帮我,那就太好了。

    0 回复  |  直到 5 年前
        1
  •  3
  •   Rodrigo Hernández Mota    5 年前

    我建议你用斯卡拉的 case classes 用于数据建模。

    final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
    

    现在你可以 List 属于 Product

      val inMemoryRecords: List[Product] = List(
        Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
        Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
        Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
      )
    

    这个 structured streaming API 使用广为人知的 Dataset[T] 抽象。粗略地说,你只需要担心三件事:

    • Source :源可以生成输入数据流,我们可以将其表示为 Dataset[Input] Input 它将被追加到这个无限的数据集中。您可以随心所欲地操作数据(例如。 数据集[输入] Dataset[Output] ).
    • StreamingQueries Sink :查询生成一个结果表,该表在每个触发间隔从源更新。更改被写入称为接收器的外部存储器。
    • Output modes :有不同的模式可以将数据写入接收器:完整模式、附加模式和更新模式。

    假设你想知道分子量大于200单位的产物。

    正如您所说,使用批处理API相当简单和直接:

    // Create an static dataset using the in-memory data
    val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)
    
    // Processing...
    val result: Dataset[Product] = staticData.filter(_.weight > 200)
    
    // Print results!
    result.show()
    

    使用流式API时,只需定义 source 以及 sink 作为额外的一步。在这个例子中,我们可以使用 MemoryStream 以及 console

    // Create an streaming dataset using the in-memory data (memory source)
    val productSource = MemoryStream[Product]
    productSource.addData(inMemoryRecords)
    
    val streamingData: Dataset[Product] = productSource.toDS()
    
    // Processing...
    val result: Dataset[Product] = streamingData.filter(_.weight > 200)
    
    // Print results by using the console sink. 
    val query: StreamingQuery = result.writeStream.format("console").start()
    
    // Stop streaming
    query.awaitTermination(timeoutMs=5000)
    query.stop()
    
    

    请注意 staticData streamingData 具有准确的类型签名(即。, Dataset[Product] ). 这允许我们应用相同的处理步骤,而不必使用批处理或流式API。您还可以考虑实现泛型方法 def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???

    完整代码示例:

    object ExMemoryStream extends App {
    
      // Boilerplate code...
      val spark: SparkSession = SparkSession.builder
        .appName("ExMemoryStreaming")
        .master("local[*]")
        .getOrCreate()
    
      spark.sparkContext.setLogLevel("ERROR")
    
      import spark.implicits._
      implicit val sqlContext: SQLContext = spark.sqlContext
    
      // Define your data models 
      final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
    
      // Create some in-memory instances
      val inMemoryRecords: List[Product] = List(
        Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
        Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
        Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
      )
    
      // Defining processing step
      def processing(inputData: Dataset[Product]): Dataset[Product] =
        inputData.filter(_.weight > 200)
    
      // STATIC DATASET
      val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)
    
      println("This is the static dataset:")
      processing(datasetStatic).show()
    
      // STREAMING DATASET
      val productSource = MemoryStream[Product]
      productSource.addData(inMemoryRecords)
    
      val datasetStreaming: Dataset[Product] = productSource.toDS()
    
      println("This is the streaming dataset:")
      val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
      query.awaitTermination(timeoutMs=5000)
    
      // Stop query and close Spark
      query.stop()
      spark.close()
    
    }