我有一个这样的代码
@PostConstruct
public void setUpStreamer() {
WebClient.create(daemonEndpoint)
.get()
.uri("/events)
.retrieve()
.bodyToFlux(String.class)
.flatMap(Mono::justOrEmpty)
.map(s -> {
try {
return mapper.readValue(s, Map.class);
} catch (IOException e) {
log.warn("unable to parse {} as JSON", s);
return null;
}
})
.flatMap(Mono::justOrEmpty)
.subscribe(
events -> {
log.info("service event(s) detected");
},
throwable -> log.error("Error on event stream: {}", throwable.getMessage(), throwable),
() -> log.warn("event stream completed")
);
}
我想做的是在流失败时,基本上事件端点决定断开连接或重新启动,拖缆将重新创建。
像这样的
throwable -> {
log.error("Error on event stream: {}", throwable.getMessage(), throwable);
setUpStreamer();
},
() -> {
log.warn("event stream completed");
setUpStreamer();
}