代码之家  ›  专栏  ›  技术社区  ›  tryingSpark

无法以AVRO格式从Kafka Producer发送GenericRecord数据

  •  0
  • tryingSpark  · 技术社区  · 6 年前

    使用合流-oss-5.0.0-2.11 我的 卡夫卡制片人 代码是

    public class AvroProducer {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("ZOOKEEPER_HOST", "localhost");
            //props.put("acks", "all");
            props.put("retries", 0);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
            props.put("schema.registry.url", "http://localhost:8081");
            String topic = "confluent-new";
    
            Schema.Parser parser = new Schema.Parser();
    // I will get below schema string from SCHEMA REGISTRY
            Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");
    
            Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
            GenericRecord record = new GenericData.Record(schema);
            record.put("uID", "06080000");
            record.put("userName", "User data10");
            record.put("company", "User data10");
            record.put("age", 12);
            record.put("location", "User data10");
    
            ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
            producer.send(recordData);
    
            System.out.println("Message Sent");
        }
    

    }

    似乎生产者代码可以看到 已发送消息 在控制台上。

    卡夫卡消费者 代码是:

    public class AvroConsumer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("group.id", "consumer1");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";
    
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
        consumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
            for (ConsumerRecord<String, GenericRecord> rec : recs) {
                System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
            }
        }
    }
    

    }

    我看不到卡夫卡消费端的信息(数据)。我还检查了 合流 主题及其不更新。似乎生产者代码有问题。 任何指针都会有帮助。

    同时下面的生产者代码正在工作,这里POJO即。 用户 是avro工具生成的POJO。

    public class AvroProducer {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            Properties props = new Properties();
            kafkaParams.put("auto.offset.reset", "smallest");
            kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
            props.put("bootstrap.servers", "localhost:9092");
            props.put("ZOOKEEPER_HOST", "localhost");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
            props.put("schema.registry.url", "http://localhost:8081");
            String topic = "confluent-new";
    
            Producer<String, User> producer = new KafkaProducer<String, User>(props);
            User user = new User();
            user.setUID("0908");
            user.setUserName("User data10");
            user.setCompany("HCL");
            user.setAge(20);
            user.setLocation("Noida");
            ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
            producer.send(record).get();
            System.out.println("Sent");
        }
    

    }

    附笔。 我的要求是以AVRO格式将接收到的JSON数据从源KAFKA主题发送到目标KAFKA主题。首先,我使用AVRO4S从接收到的JSON数据推断AVRO模式,并将该模式注册到模式注册表。接下来是从接收到的JSON中提取数据并填充GenericRecord实例,然后使用KafkaAvroSerializer将这个GenericRecord实例发送到Kafka主题。在用户端,我将使用kafkaavrodeserizer对接收到的AVRO数据进行反序列化。

    2 回复  |  直到 6 年前
        1
  •  1
  •   sparkingmyself    6 年前

    请尝试在第一个生产者中添加get()

    producer.send(recordData).get();
    
        2
  •  1
  •   tryingSpark    6 年前

    在寻找解决办法的过程中,我试着 线程睡眠(1000) 它解决了我的问题。我也试过 producer.send(记录).get() 这也解决了问题。经过之后 Documentation 我遇到下面的代码片段,它提示了解决方案。

    // When you're finished producing records, you can 
       flush the producer to ensure it has all been `written` to Kafka and
       // then close the producer to free its resources.
    
    finally {
      producer.flush();
      producer.close();
      }
    

    这是解决这个问题的最好方法。