代码之家  ›  专栏  ›  技术社区  ›  mancini0

当客户端取消服务调用时,不会激发gRPC服务的Context CancellationListener

  •  -1
  • mancini0  · 技术社区  · 6 年前

    我有一个流服务,它可以无限期地从服务器流到客户端,直到客户端取消。

    在服务器端,我有一个线程,它用来自数据库的数据填充ehcache。

    Ehcache提供对缓存事件的回调,即添加项时,删除项时,等等。我只关心在元素放入缓存时通知客户端,因此当客户端连接到我的gRPC服务时,我注册一个 notifyElementPut() 带有缓存的回调,其中包含对已连接客户端的引用 StreamObserver :

    public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {
    
    
      private StreamObserver<FooUpdateResponse> responseObserver;
    
      public GrpcAwareCacheEventListener(
          StreamObserver<FooUpdateResponse> responseObserver) {
        this.responseObserver = responseObserver;
      }
    
    
      @Override
      public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
    
        Foo foo = (Foo) element.getObjectValue();
        if (foo != null) {
          responseObserver.onNext(
              FooResponse.newBuilder().setFoo(foo).build());
        }
      }
    }
    

    我的流媒体foo服务如下:

        public void streamFooUpdates(Empty request,
                  StreamObserver<FooResponse> responseObserver) {
    
                final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
                fooCache.getCacheEventNotificationService().registerListener(eventListener);
                Context.current().withCancellation().addListener(new CancellationListener() {
    
                  public void cancelled(Context context) {
        log.info("inside context cancelled callback");      
      fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
                  }
    
                }, ForkJoinPool.commonPool());
    
    
    
              }
    

    这一切工作正常,只要客户端连接,就会收到所有foo更新的通知。

    但是,在客户机断开连接或显式取消调用之后,我希望服务器上下文的取消侦听器将启动,从而用缓存注销回调。

    不管客户机是关闭通道还是显式取消调用,情况都不是这样。(我希望服务器端取消的上下文为这两个事件触发)。我想知道客户端的cancel语义是否不正确,下面是我的客户端代码,取自一个测试用例:

    Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
            .usePlaintext().build();
    
        FooServiceGrpc.FooService stub = FooServiceGrpc
            .newStub(channel);
    
    
        ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
          public void onNext(FooResponse response) {
            log.info("received foo: {}", response.getFoo());
          }
    
          public void onError(Throwable throwable) {
    
          }
    
          public void onCompleted() {
    
          }
    
          public boolean isReady() {
            return false;
          }
    
          public void setOnReadyHandler(Runnable runnable) {
    
          }
    
          public void disableAutoInboundFlowControl() {
    
          }
    
          public void request(int i) {
    
          }
    
          public void setMessageCompression(boolean b) {
    
          }
    
          public void cancel(@Nullable String s, @Nullable Throwable throwable) {
    
          }
        };
    
        stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
        Thread.sleep(10000); // sleep 10 seconds while messages are received.
        cancellableObserver.cancel("cancelling from test", null); //explicit cancel
        ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.
    
        Thread.sleep(7000); //channel should be shutdown by now.
    
      }
    

    谢谢!

    1 回复  |  直到 5 年前
        1
  •  3
  •   mkobit    5 年前

    您没有正确取消客户端呼叫。这个 StreamObserver 关于第二个论点 stub.streamFooUpdates() 是你的回电。你不应该打那个电话 StreamObserver公司 .

    有两种方法可以取消来自客户端的呼叫。

    选项1:通过a ClientResponseObserver 作为第二个参数,implement beforeStart() ClientCallStreamObserver cancel() .

    选项2:运行 stub.streamfoo更新() 内部 CancellableContext ,并取消 Context 可取消上下文 一定要取消,这就是 finally 积木是用来做的。

    CancellableContext withCancellation = Context.current().withCancellation();
    try {
      withCancellation.run(() -> {
          stub.streamFooUpdates(...);
          Thread.sleep(10000);
          withCancellation.cancel(null);
      });
    } finally {
      withCancellation.cancel(null);
    }