代码之家  ›  专栏  ›  技术社区  ›  Victor

用于递增计数器的并行流中的窥视

  •  0
  • Victor  · 技术社区  · 6 年前

    File file = articles.parallelStream( )
                        .map( article -> {
                            String fileName = processer.getFriendlyName( article, locale );
                            currentCount.incrementAndGet();
                            return new ImmutablePair<>( fileName, converted );
                        } )
                        .peek( pair -> statusMessageSender.sendStatusMessage( totalCount, currentCount.get(), pair.getKey( ) ) )
                        .collect( new Archiver( archivePath ) );
    

    通过阅读javadocs,我不完全确定应该发送当前进度状态的计数器是否在工作(基本上,在这里的文档中寻找保证)

    对于平行流管道,可以在 上游作业。

    在我看来,无论文件名相对于处理顺序是否正确,观察者都将获得当前计数,这很好。但到了最后,我还是不相信偷看的结果,并在sendStatusMessage的接收器上实现同步。

    最后,我在寻找一种方式来发送状态在一个平行流,有什么想法吗?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Victor    6 年前

    最初,讨论中有很多关于peek的内容,以及为什么我要从映射表达式中分离消息传递部分。这更多的是一个风格问题,因为我倾向于使用映射函数进行映射,而不是更多。

    我能理解为什么人们会为偷窥辩护或反对它。但是按钮行它的作用是消耗一个值并将其传递到管道中。所以,当我在寻找一个并行行为(传递消息)时,peek函数看起来很完美。

    最后,计数器可能在peek部分,消息接收器也是这里唯一的真实因素。消息接收器可以有自己的计数器,或者只考虑在时间帧中接收的最高值。

    在功能方面,peek函数会做得很好:主要是因为管道中的序列没有排序。

    然而,消息使用者会告诉我们是否可以正确地使用该消息,考虑到只有一个使用者在使用该信息,而其他使用者没有使用该信息,最后的结论是,我们在协议设计中遇到了问题,而不是围绕peek函数。我们从std信息中删除了计数器,问题消失了。peek可以安全地解决这个问题,是的,但是。。。

    可能是:

    File archive = articles.parallelStream( )
                           .map( article -> {
                               File converted = converter.getFile( ... );
                               String fileName = converter.getFriendlyName( ... );
                               return new ImmutablePair<>( fileName, converted );
                           } )
                           .peek( pair -> statusMessageSender.sendStatusMessage( pair.getKey() ) )
                           .collect( new Archiver( archivePath, deleteArchivedFiles ) );
    

    File archive = articles.parallelStream( )
                           .map( article -> {
                               File converted = converter.getFile( ... );
                               String fileName = converter.getFriendlyName( ... );
                               return new ImmutablePair<>( fileName, converted );
                           } )
                           .peek( pair -> statusMessageSender.sendStatusMessage( currentCount.incrementAndGet(), pair.getKey() ) )
                           .collect( new Archiver( archivePath, deleteArchivedFiles ) );