我有一个主题,可以从中接收不同类型的json。然而,当消费者试图阅读消息时,我似乎收到了一个异常。我试图添加额外的bean名称,但这不起作用。它似乎试图从主题中阅读,并试图转换为从主题中阅读的所有类型。是否有方法指定只应为特定输入类型启用特定工厂。有没有其他方法来解决这个问题。
错误
org.springframework.messaging.converter.MessageConversionException:消息转换异常:
[com.lte.assessment.AssessmentAttemptRequest]至
[com.lte.assessmentanalytics.data.siteLevelAnalyticsQuest]的
[有效负载=com.lte.assessment.AssessmentAttemptRequest@68eb637f,
标题{kafka_offset=22,
卡夫卡消费者=org.apache.kafka.clients.consumer.KafkaConsumer@252d8ffb,
kafka_timestampType=创建时间,kafka_receivedMessageKey=空,
卡夫卡收到时间戳=1546117529267}
配置
@EnableKafka
@Configuration
public class KafkaConfig {
static Map<String, Object> config = new HashMap();
static {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
}
@Bean
public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");
return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
}
@Bean(name="aaKafkaListenerFactory")
public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptRequest> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, AssessmentQuestionAnalyticsEntity> assessmentQuestionAnalyticssEntityConsumerFactory() {
JsonDeserializer<AssessmentQuestionAnalyticsEntity> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");
return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
}
@Bean(name="aqKafkaListenerFactory")
public ConcurrentKafkaListenerContainerFactory aqKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, AssessmentQuestionAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(assessmentQuestionAnalyticssEntityConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, SiteLevelAnalyticsEntity> siteLevelAnalyticsEntityConsumerFactory() {
JsonDeserializer<SiteLevelAnalyticsEntity> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");
return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);
}
@Bean("slaKafkaListenerFactory")
public ConcurrentKafkaListenerContainerFactory slaKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SiteLevelAnalyticsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(siteLevelAnalyticsEntityConsumerFactory());
return factory;
}
}
@Service
public class TopicObserver implements
ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware{
@Autowired
private AssessmentAttemptService assessmentAttemptService;
@Autowired
private AssessmentQuestionService assessmentQuestionService;
@Autowired
private SiteLevelAnalyticsService siteLevelAnalyticsService;
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aaKafkaListenerFactory")
public void consumeAttemptDetails(AssessmentAttemptRequest request) {
assessmentAttemptService.storeAttempDetails(request);
}
@KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "aqKafkaListenerFactory")
public void setAssessmentQeustionAnalytics(AssessmentQuestionRequest request) {
assessmentQuestionService.storeQuestionDetails(request);
}
@KafkaListener(topics = "ltetopic", groupId = "group_id", containerFactory = "slaKafkaListenerFactory")
public void siteLevelAnalytics(SiteLevelAnalyticsRequest request) {
siteLevelAnalyticsService.storeSiteLevelDetailsDetails(request);
}
}