代码之家  ›  专栏  ›  技术社区  ›  cscan ssice

analysisexception:必须使用writestream.start()执行具有流媒体源的查询

  •  6
  • cscan ssice  · 技术社区  · 6 年前

    我收到一个异常,它指示我需要启动一个流才能使用它。但是,正在启动流。这个装置怎么了?

    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", "inputTopic")
      .option("startingOffsets", "earliest")
      .load
      .selectExpr(deserializeKeyExpression, deserializeValueExpression)
      .select("value.*")
      .withColumn("date", to_timestamp(from_unixtime(col("date"))))
      .transform(model.transform)
      .select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
      .selectExpr(serializeKeyExpression, serializeValueExpression)
      .writeStream
      .outputMode("update")
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("checkpointLocation", "checkpoint")
      .option("topic", "outputTopic")
      .start
    

    例外情况如下:

    Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        ...
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
        at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
        at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
        at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
        at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
        at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
        at com.company.project.Stream.transform(NewsRateJob.scala:65)
        at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
        at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
        at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
        ... 18 common frames omitted
    

    我熟悉spark 2.2和矢量汇编程序的问题,但是我使用的是spark2.3.1。

    2 回复  |  直到 6 年前
        1
  •  3
  •   cscan ssice    6 年前

    发生此异常是因为模型试图在流启动之前访问流中的数据。在本例中,矢量汇编程序调用 first 以确定向量的宽度。

    2.3不会自动修复VectorAssembler和结构化流的问题,它只是提供了一个类(特别是VectorSizeHint类),可以与具有结构化流的VectorAssembler一起使用。将此添加到管道的各个阶段解决了问题。

    stages += new VectorSizeHint()
      .setInputCol(column)
      .setSize(size)
    

    以下是一些说明如何使用的文档: https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html

    注意:这不是OneHotencodeRestimator功能所必需的。

    出于其他几个原因,我们经历了类似的堆栈跟踪。一个是因为我们在模型中使用了OneHotEstimator(需要更新为OneHotEncodeRestimator),另一个是因为我们正在缓存管道(我们删除了缓存步骤)。

        2
  •  0
  •   user10391850    6 年前

    发生异常是因为您试图使用ml Transformer 有流媒体 Dataset 是的。正如在 Spark Structured Streaming and Spark-Ml Regression ,到目前为止,spark不支持结构化流上的ml。

    您必须重写代码来手动转换数据,而不依赖于rdd和ml库。