代码之家  ›  专栏  ›  技术社区  ›  tkruse

检查是否已使用java流

  •  1
  • tkruse  · 技术社区  · 5 年前

    我如何检查流实例是否已被使用(这意味着调用了终端操作,因此对终端操作的任何进一步调用可能会失败) IllegalStateException: stream has already been operated upon or closed. ?

    理想情况下,我想要一个方法,如果流还没有被消耗,就不消耗它;如果流已经被消耗,而没有捕捉到错误,则返回布尔值false IllegalStateException 来自流方法(因为对控制流使用异常代价高昂且容易出错,尤其是在使用标准异常时)。

    类似于 hasNext() 在迭代器中,异常抛出和布尔返回行为(尽管没有 next() ).

    例子:

    public void consume(java.util.function.Consumer<Stream<?>> consumer, Stream<?> stream) {
       consumer.accept(stream);
       // defensive programming, check state
       if (...) {
           throw new IllegalStateException("consumer must call terminal operation on stream");
       }
    }
    

    目标是,如果客户机代码在不使用流的情况下调用此方法,那么会尽早失败。

    似乎没有办法做到这一点,我必须添加一个try-catch块来调用任何终端操作,比如 iterator() ,捕获一个异常并抛出一个新异常。

    一个可接受的答案也可以是“不存在解决方案”,并有充分的理由说明为什么规范不能添加这种方法(如果存在充分的理由)。JDK流似乎通常在其终端方法的开头有这样的片段:

    // in AbstractPipeline.java
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    

    因此,对于这些流来说,实现这种方法似乎没有那么困难。

    0 回复  |  直到 5 年前
        1
  •  4
  •   Eugene    5 年前

    考虑到 spliterator (例如)是一个终端操作,您只需创建如下方法:

    private static <T> Optional<Stream<T>> isConsumed(Stream<T> stream) {
    
        Spliterator<T> spliterator;
        try {
            spliterator = stream.spliterator();
        } catch (IllegalStateException ise) {
            return Optional.empty();
        }
    
        return Optional.of(StreamSupport.stream(
            () -> spliterator,
            spliterator.characteristics(),
            stream.isParallel()));
    }
    

    我不知道有什么更好的方法。。。用法是:

    Stream<Integer> ints = Stream.of(1, 2, 3, 4)
                                     .filter(x -> x < 3);
    
    YourClass.isConsumed(ints)
             .ifPresent(x -> x.forEachOrdered(System.out::println));
    

    因为我认为没有实际的理由返回已经消耗的流,所以我返回 Optional.empty() 相反

        2
  •  1
  •   tkruse    5 年前

    一种解决方案是添加中间操作(例如。 filter() )到 stream 在把它交给 consumer .在该操作中,您只需保存调用该操作的状态(例如,使用 AtomicBoolean ):

    public <T> void consume(Consumer<Stream<T>> consumer, Stream<T> stream) {
        AtomicBoolean consumed = new AtomicBoolean(false);
        consumer.accept(stream.filter(i -> {
            consumed.set(true);
            return true;
        }));
        if (!consumed.get()) {
            throw new IllegalStateException("consumer must call terminal operation on stream");
        }
    }
    

    旁注: 不要使用 peek() 因为它不是通过短路终端操作调用的(比如 findAny() ).

        3
  •  0
  •   roookeee    5 年前

    下面是一个独立的可编译解决方案,它使用委托定制 Spliterator<T> 实施+an AtomicBoolean 在不损失线程安全性或不影响 Stream<T> .

    主要条目是 Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) 函数-您可以在回调函数中执行任何操作。我首先修补了一个授权 流(<);T> 实现,但它的接口太大,无法在没有任何问题的情况下进行委托(甚至请参见我的代码注释) 拆分器<T> 在授权时有它的警告):

    import java.util.Spliterator;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.function.Consumer;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;
    import java.util.stream.StreamSupport;
    
    class StackOverflowQuestion56927548Scratch {
    
        private static class TrackingSpliterator<T> implements Spliterator<T> {
            private final AtomicBoolean tracker;
            private final Spliterator<T> delegate;
            private final Runnable callback;
    
            public TrackingSpliterator(Stream<T> forStream, Runnable callback) {
                this(new AtomicBoolean(true), forStream.spliterator(), callback);
            }
    
            private TrackingSpliterator(
                    AtomicBoolean tracker,
                    Spliterator<T> delegate,
                    Runnable callback
            ) {
                this.tracker = tracker;
                this.delegate = delegate;
                this.callback = callback;
            }
    
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                boolean advanced = delegate.tryAdvance(action);
                if(tracker.compareAndSet(true, false)) {
                    callback.run();
                }
                return advanced;
            }
    
            @Override
            public Spliterator<T> trySplit() {
                Spliterator<T> split = this.delegate.trySplit();
                //may return null according to JavaDoc
                if(split == null) {
                    return null;
                }
                return new TrackingSpliterator<>(tracker, split, callback);
            }
    
            @Override
            public long estimateSize() {
                return delegate.estimateSize();
            }
    
            @Override
            public int characteristics() {
                return delegate.characteristics();
            }
        }
    
        public static <T> Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) {
            return StreamSupport.stream(
                    new TrackingSpliterator<>(input, () -> callback.accept(input)),
                    input.isParallel()
            );
        }
    
        public static void main(String[] args) {
            //some big stream to show it works correctly when parallelized
            Stream<Integer> stream = IntStream.range(0, 100000000)
                    .mapToObj(Integer::valueOf)
                    .parallel();
            Stream<Integer> trackedStream = track(stream, s -> System.out.println("consume"));
    
            //dummy consume
            System.out.println(trackedStream.anyMatch(i -> i.equals(-1)));
        }
    }
    

    只需返回 track 功能,也许能适应 callback 参数类型(您可能不需要通过流),这样就可以了。

    请注意,这个实现只跟踪流实际被消耗的时间,调用 .count() Stream 这是由例如。 IntStream.range(0,1000) (没有任何过滤步骤等)将不会使用流,而是通过 Spliterator<T>.estimateSize() !