我有一个流服务,它可以无限期地从服务器流到客户端,直到客户端取消。
在服务器端,我有一个线程,它用来自数据库的数据填充ehcache。
Ehcache提供对缓存事件的回调,即添加项时,删除项时,等等。我只关心在元素放入缓存时通知客户端,因此当客户端连接到我的gRPC服务时,我注册一个
notifyElementPut()
带有缓存的回调,其中包含对已连接客户端的引用
StreamObserver
:
public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {
private StreamObserver<FooUpdateResponse> responseObserver;
public GrpcAwareCacheEventListener(
StreamObserver<FooUpdateResponse> responseObserver) {
this.responseObserver = responseObserver;
}
@Override
public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
Foo foo = (Foo) element.getObjectValue();
if (foo != null) {
responseObserver.onNext(
FooResponse.newBuilder().setFoo(foo).build());
}
}
}
我的流媒体foo服务如下:
public void streamFooUpdates(Empty request,
StreamObserver<FooResponse> responseObserver) {
final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
fooCache.getCacheEventNotificationService().registerListener(eventListener);
Context.current().withCancellation().addListener(new CancellationListener() {
public void cancelled(Context context) {
log.info("inside context cancelled callback");
fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
}
}, ForkJoinPool.commonPool());
}
这一切工作正常,只要客户端连接,就会收到所有foo更新的通知。
但是,在客户机断开连接或显式取消调用之后,我希望服务器上下文的取消侦听器将启动,从而用缓存注销回调。
不管客户机是关闭通道还是显式取消调用,情况都不是这样。(我希望服务器端取消的上下文为这两个事件触发)。我想知道客户端的cancel语义是否不正确,下面是我的客户端代码,取自一个测试用例:
Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
.usePlaintext().build();
FooServiceGrpc.FooService stub = FooServiceGrpc
.newStub(channel);
ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
public void onNext(FooResponse response) {
log.info("received foo: {}", response.getFoo());
}
public void onError(Throwable throwable) {
}
public void onCompleted() {
}
public boolean isReady() {
return false;
}
public void setOnReadyHandler(Runnable runnable) {
}
public void disableAutoInboundFlowControl() {
}
public void request(int i) {
}
public void setMessageCompression(boolean b) {
}
public void cancel(@Nullable String s, @Nullable Throwable throwable) {
}
};
stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
Thread.sleep(10000); // sleep 10 seconds while messages are received.
cancellableObserver.cancel("cancelling from test", null); //explicit cancel
((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.
Thread.sleep(7000); //channel should be shutdown by now.
}
谢谢!