代码之家  ›  专栏  ›  技术社区  ›  delalma

制作人正在出版卡夫卡,但无法阅读Spark结构化流媒体

  •  0
  • delalma  · 技术社区  · 3 年前

    我使用卡夫卡发布推特,它运行正常,因为我能够看到以下命令的回声

    bin/kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xxx:9092 --topic trump --from-beginning
    

    但是当我尝试使用以下代码使用结构化流媒体时

    if __name__ == "__main__":
    
        spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
    
        source_df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "xxx.xxx.xx.xxx:9092") \
            .option("subscribe", "tweets") \
            .option("startingOffsets", "latest") \
            .load()
    
        query = source_df \
            .writeStream \
            .outputMode("append") \
            .format("console") \
            .start()
    

    +----+--------------------+------+---------+------+--------------------+-------------+
    | key|               value| topic|partition|offset|           timestamp|timestampType|
    +----+--------------------+------+---------+------+--------------------+-------------+
    |null|[52 54 20 40 70 7...|tweets|        0| 45724|2021-03-17 12:57:...|            0|
    |null|[23 52 57 52 49 2...|tweets|        0| 45725|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 54 7...|tweets|        0| 45726|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 44 6...|tweets|        0| 45727|2021-03-17 12:57:...|            0|
    |null|[40 42 42 43 50 6...|tweets|        0| 45728|2021-03-17 12:57:...|            0|
    |null|[40 4C 6F 72 64 5...|tweets|        0| 45729|2021-03-17 12:57:...|            0|
    |null|[41 6E 6E 6F 75 6...|tweets|        0| 45730|2021-03-17 12:57:...|            0|
    |null|[42 69 74 63 6F 6...|tweets|        0| 45731|2021-03-17 12:57:...|            0|
    |null|[40 65 72 69 6B 7...|tweets|        0| 45732|2021-03-17 12:57:...|            0|
    |null|[74 68 65 20 6D 6...|tweets|        0| 45733|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 6D 6...|tweets|        0| 45734|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 6D 6...|tweets|        0| 45735|2021-03-17 12:57:...|            0|
    |null|[40 42 54 43 54 4...|tweets|        0| 45736|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 49 6...|tweets|        0| 45737|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 63 6...|tweets|        0| 45738|2021-03-17 12:57:...|            0|
    |null|[42 75 20 6F 6C 6...|tweets|        0| 45739|2021-03-17 12:57:...|            0|
    |null|[40 5F 43 72 79 7...|tweets|        0| 45740|2021-03-17 12:57:...|            0|
    |null|[40 57 69 6E 66 6...|tweets|        0| 45741|2021-03-17 12:57:...|            0|
    |null|[4D 79 20 72 65 6...|tweets|        0| 45742|2021-03-17 12:57:...|            0|
    |null|[52 54 20 40 6F 6...|tweets|        0| 45743|2021-03-17 12:57:...|            0|
    +----+--------------------+------+---------+------+--------------------+-------------+
    only showing top 20 rows
    

    任何帮助了解情况的人都将不胜感激。

    1 回复  |  直到 3 年前
        1
  •  1
  •   Michael Heil    3 年前

    默认情况下,数据(列 key value )存储在Kafka中的数据被序列化为字符串。

    在看 Structured Streaming + Kafka Integration Guide 钥匙 binary :

    enter image description here

    source_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
            .writeStream \
            .outputMode("append") \
            .format("console") \
            .start()
    
    推荐文章