代码之家  ›  专栏  ›  技术社区  ›  SourceSimian

source.queue提供意外行为

  •  0
  • SourceSimian  · 技术社区  · 6 年前

    不管我为source.queue的bufferSize和overflowStrategy参数提供什么参数,结果总是类似于底部的输出。我希望能够看到报价调用和报价结果或多或少立即完成,能够看到不同的处理,并根据缓冲大小和溢出策略提供结果消息。我在这里做错什么了?


    代码:

      def main(args: Array[String]): Unit = {
        implicit val system: ActorSystem = ActorSystem("scratch")
        implicit val materializer: ActorMaterializer = ActorMaterializer()
        implicit val executionContext: ExecutionContextExecutor = system.dispatcher
    
        val start = Instant.now()
        def elapsed = time.Duration.between(start, Instant.now()).toMillis
        val intSource = Source.queue[Int](2, OverflowStrategy.dropHead)
        val intSink = Sink foreach { ii: Int =>
          Thread.sleep(1000)
          println(s"processing $ii at $elapsed")
        }
        val intChannel = intSource.to(intSink).run()
        (1 to 4) map { ii =>
          println(s"offer invocation for $ii at $elapsed")
          (ii, intChannel.offer(ii))
        } foreach { intFutureOfferResultPair =>
          val (ii, futureOfferResult) = intFutureOfferResultPair
          futureOfferResult onComplete { offerResult =>
            println(s"offer result for $ii: $offerResult at $elapsed")
          }
        }
        intChannel.complete()
    
        intChannel.watchCompletion.onComplete { _ => system.terminate() }
      }
    

    输出:

    offer invocation for 1 at 72
    offer invocation for 2 at 77
    offer invocation for 3 at 77
    offer invocation for 4 at 77
    offer result for 1: Success(Enqueued) at 90
    processing 1 at 1084
    offer result for 2: Success(Enqueued) at 1084
    processing 2 at 2084
    offer result for 3: Success(Enqueued) at 2084
    processing 3 at 3084
    offer result for 4: Success(Enqueued) at 3084
    processing 4 at 4084
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   SourceSimian    6 年前

    我可以通过替换得到预期的行为:

    val intChannel = intSource.to(intSink).run()
    

    用:

    val (intChannel, futureDone) = intSource.async.toMat(intSink)(Keep.both).run()
    

    还有:

    intChannel.watchCompletion.onComplete { _ => system.terminate() }
    

    用:

    futureDone.onComplete { _ => system.terminate() }
    

    固定代码:

      def main(args: Array[String]): Unit = {
        implicit val system: ActorSystem = ActorSystem("scratch")
        implicit val materializer: ActorMaterializer = ActorMaterializer()
        implicit val executionContext: ExecutionContextExecutor = system.dispatcher
    
        val start = Instant.now()
        def elapsed = time.Duration.between(start, Instant.now()).toMillis
        val intSource = Source.queue[Int](2, OverflowStrategy.dropHead)
        val intSink = Sink foreach { ii: Int =>
          Thread.sleep(1000)
          println(s"processing $ii at $elapsed")
        }
        val (intChannel, futureDone) = intSource.async.toMat(intSink)(Keep.both).run()
        (1 to 4) map { ii =>
          println(s"offer invocation for $ii at $elapsed")
          (ii, intChannel.offer(ii))
        } foreach { intFutureOfferResultPair =>
          val (ii, futureOfferResult) = intFutureOfferResultPair
          futureOfferResult onComplete { offerResult =>
            println(s"offer result for $ii: $offerResult at $elapsed")
          }
        }
        intChannel.complete()
    
        futureDone.onComplete { _ => system.terminate() }
      }
    

    产量

    offer invocation for 1 at 84
    offer invocation for 2 at 89
    offer invocation for 3 at 89
    offer invocation for 4 at 89
    offer result for 3: Success(Enqueued) at 110
    offer result for 4: Success(Enqueued) at 110
    offer result for 1: Success(Enqueued) at 110
    offer result for 2: Success(Enqueued) at 110
    processing 3 at 1102
    processing 4 at 2102