数据由scala脚本生成:
val kafkaParams = new Properties()
kafkaParams.put("bootstrap.servers", "localhost:9092")
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaParams.put("group.id", "test_luca")
//kafka producer
val producer = new KafkaProducer[String, String](kafkaParams)
//Source list
val s1 = new java.util.Timer()
val tasks1 = new java.util.TimerTask {
def run() = {
val date = new java.util.Date
val date2 = date.getTime()
val send = ""+ date2 + ", 45.1234, 12.5432, 4.5, 3.0"
val data = new ProducerRecord[String,String]("topic_s1", send)
producer.send(data)
}
}
s1.schedule(tasks1, 1000L, 1000L)
val s2 = new java.util.Timer()
val tasks2 = new java.util.TimerTask {
def run() = {
val date = new java.util.Date
val date2 = date.getTime()
val send = ""+ date2 + ", 1.111, 9.999, 10.4, 10.0"
val data = new ProducerRecord[String,String]("topic_s2", send)
producer.send(data)
}
}
s2.schedule(tasks2, 2000L, 2000L)
我需要在某些特定情况下测试卡夫卡的表演。在一个例子中,我有另一个脚本,它使用来自主题“topic\u s1”和“topic\u s2”的数据,对它们进行详细阐述,然后生成具有不同主题(topic\u s1b和topic\u s2b)的新数据。随后,这些详细的数据被Apache Spark流脚本使用。
如果我使用完整配置(1个kafka producer和2个主题,使用kafka producer提供的数据的“中间件”脚本,对其进行详细说明并使用新主题生成新数据,1个spark脚本,使用新主题消耗数据),spark流式脚本将无法启动
INFO AbstractCoordinator: (Re-)joining group test_luca
有什么建议吗?
更新:spark脚本:
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(4))
case class Thema(name: String, metadata: JObject)
case class Tempo(unit: String, count: Int, metadata: JObject)
case class Spatio(unit: String, metadata: JObject)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)
case class Data(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)
case class Datas(location: Location, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"group.id" -> "test_luca",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics1 = Array("topics1")
val topics2 = Array("topics2")
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams))
val stream2 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics2, kafkaParams))
val s1 = stream.map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensor]
}
)
val s2 = stream2.map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensor2]
}
)
val f1 = s1.map { x => x.sensor_name }
f1.print()
val f2 = s2.map { x => x.sensor_name }
f2.print()
谢谢
卢卡