代码之家  ›  专栏  ›  技术社区  ›  Jamie McCrindle

通用可取消异步操作接口的设计

  •  3
  • Jamie McCrindle  · 技术社区  · 14 年前

    • 支持异步调用
    • 可取消
    • 支持请求/响应
    • 支持在当前线程中返回或在另一个响应中处理响应

    这是我的第一次尝试:

    interface AsyncOperation<INPUT, OUTPUT> {
        Future<OUTPUT> execute(INPUT in, AsyncCallback<OUTPUT> callback);
    }
    
    interface AsyncCallback<OUTPUT> {
        void done(OUTPUT output);
    }
    

    // completely async operation
    operation.execute("Test", new AsyncCallback<String> {
        public void done(String output) {
            // process result...
        }
    });
    
    // sync operation with cancellation after timeout
    Future<String> future = operation.execute("Test", null);
    try {
        String result = future.get(1000);
    } catch(TimeoutException ex) {
        future.cancel();
    }
    

    缺点

    • 这很复杂

    对于某些上下文,Google协议缓冲区服务方法遵循一个相对类似的模型:

    void [methodname](RpcController controller, 
        [RequestClass] request, RpcCallback<[ResponseClass]> callback);
    

    有更好的主意吗?

    3 回复  |  直到 14 年前
        1
  •  1
  •   meriton    14 年前

    是否需要输入类型参数?将输入作为状态保存的操作对象是否更容易,如:

    void greet(final String name) {
        new AsyncOperation<Object>() {
            @Override Object doIt() {
                System.out.println("Hello " + name + "!");
            }
        }.execute(null);
    }
    

    这样,调用者就可以以类型安全的方式传递任意多个参数。

    此外,调用回调并返回未来似乎是一种奇怪的用法。你确定你需要吗?您可以提供两个execute方法,一个返回future,另一个调用回调。

        2
  •  0
  •   Abhinav Sarkar    14 年前

    看一看 jetlang . 它支持异步操作和请求-响应模型。下面是他们测试的一个例子:

    @Test
    public void simpleRequestResponse() throws InterruptedException {
        Fiber req = new ThreadFiber();
        Fiber reply = new ThreadFiber();
        req.start();
        reply.start();
        RequestChannel<String, Integer> channel = new MemoryRequestChannel<String, Integer>();
        Callback<Request<String, Integer>> onReq = new Callback<Request<String, Integer>>() {
            public void onMessage(Request<String, Integer> message) {
                assertEquals("hello", message.getRequest());
                message.reply(1);
            }
        };
        channel.subscribe(reply, onReq);
    
        final CountDownLatch done = new CountDownLatch(1);
        Callback<Integer> onReply = new Callback<Integer>() {
            public void onMessage(Integer message) {
                assertEquals(1, message.intValue());
                done.countDown();
            }
        };
        AsyncRequest.withOneReply(req, channel, "hello", onReply);
        assertTrue(done.await(10, TimeUnit.SECONDS));
        req.dispose();
        reply.dispose();
    }
    
        3
  •  0
  •   Jamie McCrindle    14 年前

    好吧,回答你自己的问题很糟糕,但我已经提出了一组非常适合这个问题的类:

    // the main Async operation interface
    public interface AsyncOperation<OUTPUT, INPUT> {
        public AsyncController execute(INPUT input, 
            AsyncCallback<OUTPUT> callback);
    }
    
    // the callback that gets called when the operation completes
    public interface AsyncCallback<OUTPUT> {
        public void done(AsyncResult<OUTPUT> output);
    }
    
    // provides the ability to cancel operations
    public interface AsyncController {
        void cancel();
    }
    
    // this provides a convenient way to manage a response that is either a
    // value or an exception
    public class AsyncResult<VALUE> {
    
        private final VALUE value;
        private final Throwable throwable;
    
        private AsyncResult(VALUE value, Throwable throwable) {
            this.value = value;
            this.throwable = throwable;
        }
    
        public AsyncResult(VALUE value) {
            this(value, null);
        }
    
        public AsyncResult(Throwable throwable) {
            this((VALUE) null, throwable);
        }
    
        public VALUE value() throws Throwable {
            if(throwable != null) {
                throw throwable;
            } else {
                return value;
            }
        }
    }