代码之家  ›  专栏  ›  技术社区  ›  mdm

FS2流异常处理不工作

  •  3
  • mdm  · 技术社区  · 7 年前

    我对FS2和异常处理有问题。我想要的是,如果 Stream[IO,A] ,当我使用 f: A => B 可以引发异常,我获得 Stream[IO,Either[Throwable,B]] .

    我尝试了以下方法,效果如预期:

    import cats.effect.IO
    val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
      .map(x => x * x)
      .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
      .attempt
    x1.compile.toVector.unsafeRunSync().foreach(println)
    

    它打印:

    Right(1)
    Right(4)
    Left(java.lang.RuntimeException: I don't like 9s)
    

    然而,我的问题开始于我试图用它做任何事情 Stream .

    val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
      .map(x => x * x)
      .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
      .attempt.map(identity)
    
    x1.compile.toVector.unsafeRunSync().foreach(println)
    

    引发异常并终止应用程序:

    java.lang.RuntimeException: I don't like 9s
        at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
        at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
        ...
    

    更奇怪的是,使用 take 拥有 流动 只返回我知道正常的元素,仍然以相同的方式爆炸:

    val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
      .map(x => x * x)
      .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
      .attempt.take(2)
    
    x1.compile.toVector.unsafeRunSync().foreach(println)
    

    有人能澄清为什么会发生这种情况吗?这是错误还是(非)预期行为?

    N、 B。 FS2中存在这种行为 0.10.0-M7 0.10.0

    2 回复  |  直到 7 年前
        1
  •  2
  •   Daenyth    6 年前

    这里的问题是 fs2 您必须编写纯代码。抛出异常并不是纯粹的,因此,如果您希望管道中的某个步骤可能会失败,则需要将其显式化。有两种方法:

    import cats.effect.IO
    val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
      .map(x => x * x)
      .map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
    x1.compile.toVector.unsafeRunSync().foreach(println)
    // Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen
    

    import cats.effect.IO
    val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
      .map(x => x * x)
      .flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
      .attempt
    x1.compile.toVector.unsafeRunSync().foreach(println)
    

    其中第一种更可取,因为 flatMap 具有 emit 将导致通常效率较低的大小为1的块。如果要在出现第一个错误时停止处理,请添加 .rethrow 到小溪的尽头。

        2
  •  1
  •   Yuval Itzchakov    6 年前

    问题似乎在这里:

    self
     .stage(depth.increment, defer, o => emit(f(o)), os => {
       var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }
    

    这是里面的代码 Segment.map . 使用以下方法分配向量时:

    Stream.emits(Vector(1,2,3,4))
    

    fs2将分配单个段。查看上面的代码 map , os.size 代表 段的大小 意思 地图 将始终映射整个线段大小。这意味着即使你问你 take(2) ,我们仍在有效地映射整个细分市场。

    我们可以通过稍微更改代码来证明这一点:

    def main(args: Array[String]): Unit = {
      val x1 = fs2.Stream
        .emits(Vector(1, 2, 3, 4))
        .segmentLimit(1)
        .covary[IO]
        .map { seg =>
          if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
          else seg
        }
        .attempt
        .take(2)
    
    println(x1.compile.toVector.unsafeRunSync())
    

    这里最重要的部分是 segmentLimit ,这使得流将内部流动的数据分块到大小为1的段。运行此代码时,我们得到:

    Vector(Right(Chunk(1)), Right(Chunk(2)))
    

    这是不是一个bug?不确定。我会咨询Gitter频道的维护人员。

    推荐文章