我正试图在SpringBoot应用程序中使用SpringCloud流绑定器Kafka实现一次性交付。
我使用的版本是:
-
spring-cloud-stream-binder-kafka-core-1.2.1.发布
-
spring-cloud-stream-binder-kafka-1.2.1.发布
-
spring-cloud-stream-codec-1.2.2.发布spring-kafka-1.1.6.发布
-
Spring-Integration-Kafka-2.1.0.发布
-
Spring-Integration-Core-4.3.10.发布
-
动物园管理员-3.4.8
-
卡夫卡版本:0.10.1.1
这是我的配置(云配置):
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
我有两个主要班:
进料槽接口:
package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface FeedSink {
String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";
@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
}
事件消费者
package au.com.xyz.proxy.consumer;
@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {
public static final String SUCCESS_MESSAGE =
"SEND-SUCCESS : Successfully sent message to platform.";
public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
"EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";
@Autowired
private CapPointService service;
@StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
/**
* method associated with stream to process message.
*/
public void message(final @Payload EventNotification eventNotification,
final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
String caseMilestone = "UNKNOWN";
if (!ObjectUtils.isEmpty(eventNotification)) {
SysMessage sysMessage = processPayload(eventNotification);
caseMilestone = sysMessage.getCaseMilestone();
try {
ClientResponse response = service.sendPayload(sysMessage);
if (response.hasFault()) {
Fault faultDetails = response.getFaultDetails();
log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
} else {
log.info(SUCCESS_MESSAGE);
}
acknowledgment.acknowledge();
} catch (Exception e) {
log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
}
} else {
log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
acknowledgment.acknowledge();
}
}
private SysMessage processPayload(final EventNotification eventNotification) {
Gson gson = new Gson();
String jsonString = gson.toJson(eventNotification.getData());
log.info("Consumed message for platform events with payload : {} ", jsonString);
SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
return sysMessage;
}
}
我已经将kafka和spring容器的autocommit属性设置为false。
如果您在eventconsumer类中看到,在i service.sendpayload成功且没有异常的情况下,我使用了acknowledge。我希望容器移动偏移量并轮询下一个记录。
我所观察到的是:
-
场景1-在异常被抛出并且在Kafka上没有发布新消息的情况下。没有重试处理消息,似乎没有活动。即使基础问题得到解决。我提到的问题是下游服务器不可用。是否有方法重试处理n次,然后放弃。注意:这是从上次提交的偏移量重新处理或重新调用的尝试。这不是关于Kafka实例不可用。
如果我重新启动服务(EC2实例),那么处理将从上次成功确认完成的偏移量开始。
-
场景2-如果发生异常,然后将后续消息推送到Kafka。我看到新消息被处理,偏移量被移动。这意味着我丢失了未被确认的信息。所以问题是我是否已经处理了确认。如何控制从上一次提交读取而不仅仅是最新消息并进行处理?我假设有一个内部的民意调查发生,它没有考虑到或不知道最后一条消息没有被确认。我不认为有多个线程从卡夫卡阅读。我不知道@input和@streamlistener注释是如何控制的。我假设线程由控制线程的属性consumer.concurrency控制,默认情况下它设置为1。
所以我做了研究,发现了很多链接,但不幸的是没有一个回答我的具体问题。
我看着(
https://github.com/spring-cloud/spring-cloud-stream/issues/575
)
有马吕斯的评论(
https://stackoverflow.com/users/809122/marius-bogoevici
):
请注意,Kafka不提供单独的消息确认,这是
意味着确认转化为更新最新使用的
偏移量到确认消息的偏移量(每个主题/分区)。那个
意味着,如果您将来自同一主题的消息分块打包
当然,消息可以“确认”之前的所有消息。
当有一个线程时,不确定是否是订单问题。
很抱歉发了这么长的帖子,但我想提供足够的信息。最重要的是,我试图避免在从卡夫卡消费时丢失消息,我试图看看SpringCloudstreambinder卡夫卡是否能做到这一点,或者我必须寻找替代方案。
更新日期:2018年7月6日
我看到这个帖子了
https://github.com/spring-projects/spring-kafka/issues/431
这是解决我问题的更好方法吗?我可以试试最新版本的春季卡夫卡
@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
-
这有助于控制要设置到最后一个位置的偏移量吗?
是否成功处理记录?我怎么能听你这么说呢
方法。consumer.seektoend();然后如何重置listen方法以获取该记录?
-
把消费者放在签名中是否提供支持
处理消费者问题?或者我还需要做什么?
-
我应该使用Acknowledge还是Consumer.Commitsyncy()
-
集装箱工厂的意义是什么?我必须定义它吗
像豆子一样。
-
我是否需要@enablekafka和@configuration才能使用上述方法?
记住,该应用程序是弹簧引导应用程序。
-
通过添加Consumer to Listen方法,我不需要实现
消费软件接口?
最后但并非最不重要的是,如果可行,是否可以提供上述方法的一些示例。
更新日期:2018年7月12日
谢谢,加里(
https://stackoverflow.com/users/1240763/gary-russell
)提供使用maxattempts的提示。我用过那种方法。我能够实现一次交付,并保持消息的顺序。
我更新的云配置:
spring:
autoconfigure:
exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
kafka:
consumer:
enable-auto-commit: false
cloud:
stream:
kafka:
binder:
brokers: "${BROKER_HOST:xyz-aws.local:9092}"
headers:
- X-B3-TraceId
- X-B3-SpanId
- X-B3-Sampled
- X-B3-ParentSpanId
- X-Span-Name
- X-Process-Id
zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
bindings:
feed_platform_events_input:
consumer:
autoCommitOffset: false
binders:
xyzkafka:
type: kafka
bindings:
feed_platform_events_input:
binder: xyzkafka
destination: platform-events
group: br-platform-events
consumer:
maxAttempts: 2147483647
backOffInitialInterval: 1000
backOffMaxInterval: 300000
backOffMultiplier: 2.0
事件使用者与我的初始实现保持相同。除非再次引发错误,否则容器将知道处理失败。如果您只是捕获它,那么容器就无法知道消息处理有失败。通过执行Acknoweldgement.Acknowledge,您只需控制偏移提交。为了重试,您必须抛出异常。不要忘记将kafka客户机autocommit属性和spring(容器级别)autocommitoffset属性设置为false。就是这样。