代码之家  ›  专栏  ›  技术社区  ›  Saffik

如何在scala中使用spark流将索引列附加到spark数据帧?

  •  0
  • Saffik  · 技术社区  · 4 年前

    我使用这样的东西:

    df.withColumn("idx", monotonically_increasing_id())
    

    但我得到了一个例外,因为它不受支持:

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression(s): monotonically_increasing_id() is not supported with streaming DataFrames/Datasets;;
    
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:143)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    

    任何想法如何添加索引或行号列 火花流 scala中的数据帧?

    完整堆栈跟踪: https://justpaste.it/5bdqr

    0 回复  |  直到 4 年前
        1
  •  0
  •   Artem    4 年前

    在Spark streaming的流媒体计划中,有一些操作不能存在于任何地方,不幸的是,包括 monotonically_increasing_id() 请仔细核查这一事实 transformed1 你的问题中的错误失败了, here 是Spark源代码中关于此检查的参考:

    import org.apache.spark.sql.functions._ 
    
    val df = Seq(("one", 1), ("two", 2)).toDF("foo", "bar")
    val schema = df.schema
    df.write.parquet("/tmp/out")
    val input = spark.readStream.format("parquet").schema(schema).load("/tmp/out")
    
    val transformed1 = input.withColumn("id", monotonically_increasing_id())
    transformed1.writeStream.format("parquet").option("format", "append") .option("path", "/tmp/out2") .option("checkpointLocation", "/tmp/checkpoint_path").outputMode("append").start()
    
    import org.apache.spark.sql.expressions.Window
    val windowSpecRowNum = Window.partitionBy("foo").orderBy("foo")
    
    val transformed2 = input.withColumn("row_num", row_number.over(windowSpecRowNum))
    transformed2.writeStream.format("parquet").option("format", "append").option("path", "/tmp/out2").option("checkpointLocation", "/tmp/checkpoint_path").outputMode("append").start()
    

    我还尝试添加索引 Window DF中的一列- transformed2 在上面的快照中,它也失败了,但情况不同 error ):

    “流媒体不支持非基于时间的窗口 数据帧/数据集”

    您可以找到的Spark Streaming的所有不受支持的运算符检查 here -在Spark Batch中添加索引列的传统方法似乎在Spark Streaming中不起作用。