代码之家  ›  专栏  ›  技术社区  ›  j2emanue

RxJava2可连接可观察-重播未发送所有以前的项目

  •  0
  • j2emanue  · 技术社区  · 6 年前
     List<Integer> list = new ArrayList<Integer>();
        for(int j=1;j<=3;j++)
            list.add(j);
    
    
        Observable<Integer> observable = Observable.fromIterable(list)
        .replay()
        .autoConnect();
    
    
    
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.v("consumer1:", ""+integer);
            }
        });
    
    
        observable.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.v("consumer2:", ""+integer);
    
    }
    });
    
    
    
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.v("consumer3:", ""+integer);
            }
        });
    

    当我运行上述代码时,我得到以下输出:

    consumer1:: 1
    consumer1:: 2
    consumer1:: 3
    consumer2:: 1
    consumer2:: 2
    consumer2:: 3
    consumer3:: 1
    consumer3:: 2
    consumer3:: 3
    

    我希望replay能够真正“重播”之前发生的所有历史。所以我希望能发出之前发生的所有流。特别是这是我期望的输出:

    //first time nothing to replay so just do the work
    consumer1:: 1
    consumer1:: 2
    consumer1:: 3
    
    //replay consumer1 stream:,
    consumer2:: 1
    consumer2:: 2
    consumer2:: 3
    //already replayed now do the work
    consumer2:: 1
    consumer2:: 2
    consumer2:: 3
    
    //replay consumer1 stream:
    consumer3:: 1
    consumer3:: 2
    consumer3:: 3
    
    //replay consumer2 stream:
    consumer3:: 1
    consumer3:: 2
    consumer3:: 3
    //now do the work
    consumer3:: 1
    consumer3:: 2
    consumer3:: 3
    

    1 回复  |  直到 6 年前
        1
  •  1
  •   akarnokd    6 年前

    不知道你为什么想要这种异国情调的行为,但你可以不断重复:

    List<Integer> list = new ArrayList<Integer>();
    for (int j = 1; j <= 3; j++) {
        list.add(j);
    }
    
    
    AtomicInteger count = new AtomicInteger();
    
    Observable<Integer> observable = 
        Observable.defer(() -> {
            Observable.fromIterable(list)
            .replay()
            .autoConnect()
            .repeat(count.incrementAndGet());
        });
    
    observable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.v("consumer1:", ""+integer);
        }
    });
    
    observable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.v("consumer2:", "" + integer);
        }
    });
    
    observable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.v("consumer3:", ""+integer);
        }
    });