代码之家  ›  专栏  ›  技术社区  ›  Frank

Spark Streaming kafka偏移量管理

  •  2
  • Frank  · 技术社区  · 6 年前

    我一直在做spark流媒体工作,通过卡夫卡消费和生成数据。我使用directDstream,所以我必须自己管理偏移量,我们采用redis来写入和读取偏移量。现在有一个问题,当我启动我的客户端时,我的客户端需要从redis获取偏移量,而不是卡夫卡本身存在的偏移量。如何编写代码?现在,我已经编写了以下代码:

       kafka_stream = KafkaUtils.createDirectStream(
        ssc,
        topics=[config.CONSUME_TOPIC, ],
        kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                     "auto.offset.reset": "largest"},
        fromOffsets=read_offset_range(config.OFFSET_KEY))
    

    但我认为fromOffset是spark流媒体客户端启动时的值(来自redis),而不是在其运行期间。谢谢你的帮助。

    1 回复  |  直到 5 年前
        1
  •  0
  •   user3689574    6 年前

    如果我理解正确,您需要手动设置偏移量。我就是这样做的:

    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark.streaming.kafka import TopicAndPartition
    
    stream = StreamingContext(sc, 120) # 120 second window
    
    kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
    kafkaParams["auto.offset.reset"] = "smallest"
    kafkaParams["enable.auto.commit"] = "false"
    
    topic = "xyz"
    topicPartion = TopicAndPartition(topic, 0)
    fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}
    
    kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)