代码之家  ›  专栏  ›  技术社区  ›  lu_ferra

卡夫卡(重新)加入小组坚持两个以上的主题

  •  0
  • lu_ferra  · 技术社区  · 7 年前

    数据由scala脚本生成:

    val kafkaParams = new Properties()
        kafkaParams.put("bootstrap.servers", "localhost:9092")
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        kafkaParams.put("group.id", "test_luca")
    
        //kafka producer
        val producer = new KafkaProducer[String, String](kafkaParams)
    
        //Source list
        val s1 = new java.util.Timer()
        val tasks1 = new java.util.TimerTask {
            def run() = {
                val date = new java.util.Date
                val date2 = date.getTime()
                val send = ""+ date2 + ", 45.1234, 12.5432, 4.5, 3.0"
                val data = new ProducerRecord[String,String]("topic_s1", send)
                producer.send(data)
            }
        }
        s1.schedule(tasks1, 1000L, 1000L)
    
        val s2 = new java.util.Timer()
        val tasks2 = new java.util.TimerTask {
            def run() = {
                val date = new java.util.Date
                val date2 = date.getTime()
                val send = ""+ date2 + ", 1.111, 9.999, 10.4, 10.0"
                val data = new ProducerRecord[String,String]("topic_s2", send)
                producer.send(data)
            }
        }
        s2.schedule(tasks2, 2000L, 2000L)
    

    我需要在某些特定情况下测试卡夫卡的表演。在一个例子中,我有另一个脚本,它使用来自主题“topic\u s1”和“topic\u s2”的数据,对它们进行详细阐述,然后生成具有不同主题(topic\u s1b和topic\u s2b)的新数据。随后,这些详细的数据被Apache Spark流脚本使用。

    如果我使用完整配置(1个kafka producer和2个主题,使用kafka producer提供的数据的“中间件”脚本,对其进行详细说明并使用新主题生成新数据,1个spark脚本,使用新主题消耗数据),spark流式脚本将无法启动 INFO AbstractCoordinator: (Re-)joining group test_luca

    有什么建议吗?

    更新:spark脚本:

    val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    
    val ssc = new StreamingContext(sc, Seconds(4))
    
    case class Thema(name: String, metadata: JObject)
    case class Tempo(unit: String, count: Int, metadata: JObject)
    case class Spatio(unit: String, metadata: JObject)
    case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
    case class Location(latitude: Double, longitude: Double, name: String)
    
    case class Data(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
    case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)
    
    
    case class Datas(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
    case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)
    
    
    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
        "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
        "group.id" -> "test_luca",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val topics1 = Array("topics1")
    val topics2 = Array("topics2")
    
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams))
    val stream2 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics2, kafkaParams))
    
    val s1 = stream.map(record => {
      implicit val formats = DefaultFormats
      parse(record.value).extract[Sensor]
    }
    )
    val s2 = stream2.map(record => {
      implicit val formats = DefaultFormats
      parse(record.value).extract[Sensor2]
    }
    )
    
    val f1 = s1.map { x => x.sensor_name }
    f1.print()
    val f2 = s2.map { x => x.sensor_name }
    f2.print()
    

    谢谢 卢卡

    1 回复  |  直到 7 年前
        1
  •  2
  •   GuangshengZuo    7 年前

    也许你应该换一组。spark流脚本的id。我猜你的“中间件”脚本的消费者也有相同的群体。id作为spark流媒体脚本的消费者。然后可怕的事情就会发生。

    在卡夫卡中,消费者组是主题的真正订阅者,组中的消费者只是一个拆分工作者,因此在您的情况下,您应该使用不同的组。中间件脚本使用者和spark streaming脚本使用者中的id。