代码之家  ›  专栏  ›  技术社区  ›  stan

Scala RestartSink未来

  •  0
  • stan  · 技术社区  · 7 年前

    我正在尝试重新创建Scala的类似功能 [RestartSink][1] 特色

    我想出了这个密码。然而,由于我们只返回 SinkShape 而不是 Sink ,我无法指定它是否应返回 Future[Done] 而不是 NotUsed 因为它是物化类型。然而,我对如何做到这一点感到困惑。我只能让它回来 [MessageActionPair, NotUsed] 而不是期望的 [MessageActionPair, Future[Done]] . 我仍然在学习这个框架,所以我确信我遗漏了一些小东西。我试着打电话 Source.toMat(RestartWithBackoffSink...) 然而,这也不能得到期望的结果。

    private final class RestartWithBackoffSink(
                                                   sourcePool:     Seq[SqsEndpoint],
                                                   minBackoff:   FiniteDuration,
                                                   maxBackoff:   FiniteDuration,
                                                   randomFactor: Double) extends GraphStage[SinkShape[MessageActionPair]] { self ⇒
    
      val in = Inlet[MessageActionPair]("RoundRobinRestartWithBackoffSink.in")
    
      override def shape = SinkShape(in)
      override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
        "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false) {
        override protected def logSource = self.getClass
    
        override protected def startGraph() = {
          val sourceOut = createSubOutlet(in)
          Source.fromGraph(sourceOut.source).runWith(createSink(getEndpoint))(subFusingMaterializer)
        }
    
        override protected def backoff() = {
          setHandler(in, new InHandler {
            override def onPush() = ()
          })
        }
    
        private def createSink(endpoint: SqsEndpoint): Sink[MessageActionPair, Future[Done]] = {
          SqsAckSink(endpoint.queue.url)(endpoint.client)
        }
    
        def getEndpoint: SqsEndpoint = {
          if(isTimedOut) {
            index = (index + 1) % sourcePool.length
            restartCount = 0
          }
          sourcePool(index)
        }
    
        backoff()
      }
    }
    

    此处出现语法错误,因为类型不匹配:

    def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, Future[Done]] = {
        Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
      }
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Stefano Bonetti    7 年前

    通过扩展 extends GraphStage[SinkShape[MessageActionPair]] 您正在定义一个没有具体化价值的阶段。或者更好地定义一个具体化为 NotUsed .

    你必须决定你的舞台是否能变成任何有意义的东西。关于阶段物化值的更多信息 here .

    如果是的话 :您必须扩展 GraphStageWithMaterializedValue[SinkShape[MessageActionPair], Future[Done]] 并正确覆盖 createLogicAndMaterializedValue 作用有关更多指导,请参阅 docs .

    如果没有 :您可以根据以下内容更改类型

    def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, sourcePool: Seq[SqsEndpoint]): Sink[MessageActionPair, NotUsed] = {
        Sink.fromGraph(new RestartWithBackoffSink(sourcePool, minBackoff, maxBackoff, randomFactor))
      }