代码之家  ›  专栏  ›  技术社区  ›  user3310115

如何在kafka springboot中从一个主题中读取多种类型的json

  •  0
  • user3310115  · 技术社区  · 5 年前

    我有一个主题,可以从中接收不同类型的json。然而,当消费者试图阅读消息时,我似乎收到了一个异常。我试图添加额外的bean名称,但这不起作用。它似乎试图从主题中阅读,并试图转换为从主题中阅读的所有类型。是否有方法指定只应为特定输入类型启用特定工厂。有没有其他方法来解决这个问题。

    错误

    org.springframework.messaging.converter.MessageConversionException:消息转换异常: [com.lte.assessment.AssessmentAttemptRequest]至 [com.lte.assessmentanalytics.data.siteLevelAnalyticsQuest]的 [有效负载=com.lte.assessment.AssessmentAttemptRequest@68eb637f, 标题{kafka_offset=22, 卡夫卡消费者=org.apache.kafka.clients.consumer.KafkaConsumer@252d8ffb, kafka_timestampType=创建时间,kafka_receivedMessageKey=空, 卡夫卡收到时间戳=1546117529267}

    配置

    @EnableKafka
    @Configuration
    public class KafkaConfig {
        static Map<String, Object> config = new HashMap();
    
        static {
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        }
    
    
        @Bean
        public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
            JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
            deserializer.addTrustedPackages("com.lte.assessment.assessments");
            return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
        }
    
        @Bean(name="aaKafkaListenerFactory")
        public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptRequest> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, AssessmentQuestionAnalyticsEntity> assessmentQuestionAnalyticssEntityConsumerFactory() {
            JsonDeserializer<AssessmentQuestionAnalyticsEntity> deserializer = new JsonDeserializer<>();
            deserializer.addTrustedPackages("com.lte.assessment.assessments");
            return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
        }
    
        @Bean(name="aqKafkaListenerFactory")
        public ConcurrentKafkaListenerContainerFactory aqKafkaListenerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, AssessmentQuestionAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(assessmentQuestionAnalyticssEntityConsumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
            JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
            deserializer.addTrustedPackages("com.lte.assessment.assessments");
            return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
        }
    
        @Bean("slaKafkaListenerFactory")
        public ConcurrentKafkaListenerContainerFactory slaKafkaListenerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, SiteLevelAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(siteLevelAnalyticsEntityConsumerFactory());
            return factory;
        }
    }
    

    @Service
    public class TopicObserver implements
            ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware{
    
        @Autowired
        private AssessmentAttemptService assessmentAttemptService;
    
        @Autowired
        private AssessmentQuestionService assessmentQuestionService;
    
        @Autowired
        private SiteLevelAnalyticsService siteLevelAnalyticsService;
    
        private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
    
        @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aaKafkaListenerFactory")
        public void consumeAttemptDetails(AssessmentAttemptRequest request) {
            assessmentAttemptService.storeAttempDetails(request);
        }
    
        @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aqKafkaListenerFactory")
        public void setAssessmentQeustionAnalytics(AssessmentQuestionRequest request) {
            assessmentQuestionService.storeQuestionDetails(request);
        }
    
        @KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "slaKafkaListenerFactory")
        public void siteLevelAnalytics(SiteLevelAnalyticsRequest request) {
            siteLevelAnalyticsService.storeSiteLevelDetailsDetails(request);
        }
    }
    
    2 回复  |  直到 5 年前
        1
  •  1
  •   Suraj Rao Soumya Sengupta    5 年前
    @Bean
    public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
        JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
        deserializer.addTrustedPackages("com.lte.assessment.assessments");
        return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
    }
    

    在你指定的消费工厂 SiteLevelAnalyticsEntity JsonDeserializer 请定义-- deserializer.addTrustedPackages("com.lte.assessment.SiteLevelAnalyticsEntity");

        2
  •  0
  •   Shankar    5 年前

    @死水坑是对的。如果需要更简单的解决方案,请将消息作为字符串JSON负载使用,并手动将它们反序列化为对象。

            @Bean
            public ConsumerFactory<Integer, String> createConsumerFactory() {
                Map<String, Object> props = new HashMap<>();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  kafkaEmbedded().getBrokersAsString());
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                return new DefaultKafkaConsumerFactory<>(props);
            }
    
            @Bean
            public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
                factory.setConsumerFactory(createConsumerFactory());
                return factory;
            }
    

    在你的倾听者中,以字符串的形式使用。

    @KafkaListener(id = "foo", topics = YOUR_TOPIC)
        public void listen(String json){
        //Convert to Object here.
    }