代码之家  ›  专栏  ›  技术社区  ›  Andrejs

Akka溪流减少为较小的溪流

  •  2
  • Andrejs  · 技术社区  · 7 年前

    A A A B B C C C C ... (very long)
    

    (A, 3) (B, 2) (C, 4)
    

    我可以在Akka Streams中使用哪些操作符来实现这一点?

    Source.fromPublisher(publisher)
        .aggregateSomehow()  // ?
        .runWith(sink)
    

    我已经调查过了 .groupBy

    编辑 : This question statefulMapConcat 组合器。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Andrejs    7 年前

    一种选择是使用 statefulMapConcat 组合器:

    Source(List("A", "A", "B", "B", "B", "C", "C", ""))
          .statefulMapConcat({ () =>
            var lastChar = ""
            var count = 0
    
            char => if(lastChar == char) {
                count += 1
                List.empty
              } else {
                val charCount = (lastChar, count)
                lastChar = char
                count = 1
                List(charCount)
              }
          })
        .runForeach(println)
    

    然而,这需要在输入流中附加一个元素来标记结束。

    输出:

    (,0)
    (A,2)
    (B,3)
    (C,2)
    

    感谢@chunjef在评论中的建议