如何在Spark 2.2.0中解析消息并将其放入数据帧中?
这是你的家庭练习,需要一些编码。
这
https://github.com/databricks/spark-avro
图书馆没有流媒体案例的例子。
有人告诉我(这里有几个问题),spark avro
不
支持Spark结构化流媒体(又名Spark Streams)。它可以很好地处理非流数据集,但不能处理流数据集。
这就是为什么我写道,这是你必须自己编码的东西。
可以如下所示(为了简单起见,我使用Scala):
// Step 1. convert messages to be strings
val avroMessages = df.select($"value" cast "string")
// Step 2. Strip the avro layer off
val from_avro = udf { (s: String) => ...processing here... }
val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))
这需要开发
from_avro
自定义UDF,可以实现您想要的功能(类似于Spark使用
from_json
标准功能!)
或者(用一种更高级的?/复杂的方法)编写自己的自定义流媒体
Source
用于Kafka中Avro格式的数据集,并改用它。
Dataset<Row> df = sparkSession.readStream()
.format("avro-kafka") // <-- HERE YOUR CUSTOM Source
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
我还没找到可行的方法
avro-kafka
格式为。这确实可行,但同时做了两件事,即阅读卡夫卡的作品
和
正在进行Avro转换,但我不相信这是Spark结构化流媒体和一般软件工程中的做法。我希望有一种方法可以一种接一种地应用一种格式,但在Spark 2.2.1中这是不可能的(也不计划在2.3中使用)。
因此,我认为UDF是目前最好的解决方案。
只是一个想法,你也可以写一个自定义卡夫卡
Deserializer
这将在Spark加载消息时进行反序列化。