代码之家  ›  专栏  ›  技术社区  ›  user1870400

如何在读取卡夫卡的消息流时处理Avro消息?

  •  3
  • user1870400  · 技术社区  · 6 年前

    下面的代码读取来自卡夫卡的消息,这些消息都在Avro中,那么如何解析消息并将其放入Spark 2.2.0中的数据帧中呢?

    Dataset<Row> df = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic1")
                .load();
    

    https://github.com/databricks/spark-avro 图书馆没有流媒体案例的例子。

    1 回复  |  直到 5 年前
        1
  •  2
  •   zero323 little_kid_pea    6 年前

    如何在Spark 2.2.0中解析消息并将其放入数据帧中?

    这是你的家庭练习,需要一些编码。

    https://github.com/databricks/spark-avro 图书馆没有流媒体案例的例子。

    有人告诉我(这里有几个问题),spark avro 支持Spark结构化流媒体(又名Spark Streams)。它可以很好地处理非流数据集,但不能处理流数据集。

    这就是为什么我写道,这是你必须自己编码的东西。

    可以如下所示(为了简单起见,我使用Scala):

    // Step 1. convert messages to be strings
    val avroMessages = df.select($"value" cast "string")
    
    // Step 2. Strip the avro layer off
    val from_avro = udf { (s: String) => ...processing here... }
    val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))
    

    这需要开发 from_avro 自定义UDF,可以实现您想要的功能(类似于Spark使用 from_json 标准功能!)


    或者(用一种更高级的?/复杂的方法)编写自己的自定义流媒体 Source 用于Kafka中Avro格式的数据集,并改用它。

    Dataset<Row> df = sparkSession.readStream()
                .format("avro-kafka") // <-- HERE YOUR CUSTOM Source
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic1")
                .load();
    

    我还没找到可行的方法 avro-kafka 格式为。这确实可行,但同时做了两件事,即阅读卡夫卡的作品 正在进行Avro转换,但我不相信这是Spark结构化流媒体和一般软件工程中的做法。我希望有一种方法可以一种接一种地应用一种格式,但在Spark 2.2.1中这是不可能的(也不计划在2.3中使用)。

    因此,我认为UDF是目前最好的解决方案。


    只是一个想法,你也可以写一个自定义卡夫卡 Deserializer 这将在Spark加载消息时进行反序列化。