代码之家  ›  专栏  ›  技术社区  ›  Evan M.

使用Akka流从数据库中传输记录

  •  4
  • Evan M.  · 技术社区  · 7 年前

    我有一个使用Akka的系统,该系统目前通过消息队列处理传入的流数据。当一个记录到达时,它会被处理,mq会被确认,记录会被传递到系统内进行进一步处理。

    现在,我想添加对使用DBs作为输入的支持。

    2 回复  |  直到 6 年前
        1
  •  11
  •   Ramón J Romero y Vigil    6 年前

    Slick库

    Slick streaming 通常是这样做的。

    稍微扩展slick文档以包括akka streams:

    //SELECT Name from Coffees
    val q = for (c <- coffees) yield c.name
    
    val action = q.result
    
    type Name = String
    
    val databasePublisher : DatabasePublisher[Name] = db stream action
    
    import akka.stream.scaladsl.Source
    
    val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
    

    现在 akkaSourceFromSlick 就像其他akka溪流一样 Source .

    ResultSet 没有浮油,就像akka溪流的“引擎”。我们将利用以下事实: 来源 Iterator .

    首先使用标准jdbc技术创建结果集:

    import java.sql._
    
    val resultSetGenerator : () => Try[ResultSet] = Try {
      val statement : Statement = ???
      statement executeQuery "SELECT Name from Coffees"
    }
    

    val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
      (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
    

    val getNameFromResultSet : ResultSet => Name = _ getString "Name"
    

    现在我们可以实现 迭代器 Iterator[Name] 从结果集:

    val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
      (resultSet) => new Iterator[Try[Name]] {
        override def hasNext : Boolean  = resultSet.next
        override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
       } flatMap (_.toOption)
    

    Source.fromIterator :

    val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
      (_ : () => Try[ResultSet])
        .andThen(_ flatMap adjustResultSetBeforeFirst) 
        .andThen(_ map convertResultSetToNameIterator) 
        .andThen(_ getOrElse Iterator.empty)
    

    该迭代器现在可以为源提供信息:

    val akkaSourceFromResultSet : Source[Name, _] = 
      Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
    

    这种实现一直到数据库都是被动的。由于ResultSet一次预取的行数有限,因此数据只能作为流通过数据库从硬盘驱动器中取出 Sink 信号需求。

        2
  •  1
  •   Sarin Madarasmi    4 年前

    我发现,与Java Publisher界面相比,Alpakka文档非常优秀,是一种更容易处理反应流的方法。

    带Slick的Alpakka文件: https://doc.akka.io/docs/alpakka/current/slick.html

    https://github.com/akka/alpakka