代码之家  ›  专栏  ›  技术社区  ›  Leonard

如何在流结束后关闭文件?

  •  5
  • Leonard  · 技术社区  · 9 年前

    我有一个java.io.File序列流。我使用 flatMapConcat 创建新的 Source 文件,类似于:

    def func(files: List[File]) =
      Source(files)
        .map(f => openFile(f))
        .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
        .grouped(10)
        .via(SomeFlow)
        .runWith(Sink.ignore)
    

    有没有一种简单的方法在流结束后关闭每个文件? SomePublisher() 无法关闭它。

    2 回复  |  直到 9 年前
        1
  •  2
  •   Leonard    9 年前

    所以我找到了很多方法中的一个很好的方法来解决我的问题,但如果你有其他方法,我也希望看到它。

    def someSource(file: File) = {
      val f = openFile(file)
    
      Source
        .fromPublisher(SomePublisher(f))
        .transform(() => new PushStage[?, ?] {
          override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
    
          override def postStop(): Unit = {
            f.close()
            super.postStop()
          }
        }
    }
    
    def func(files: List[File]) =
      Source(files)
        .flatMapConcat(someSource)
        .grouped(10)
        .via(SomeFlow)
        .runWith(Sink.ignore)
    
        2
  •  1
  •   Yury Sukhoverkhov    9 年前

    因此,如果我理解正确,您可以执行以下操作:为每个文件创建打开文件的数据库对象。因此,既然您在代码中打开了数据库连接,那么您就要负责关闭它。由于您使用的是有限的文件列表,所以可以按顺序存储所有数据库连接,运行流并在流结束后关闭所有连接。

    另一种方法是让您自己的发布者获得文件名,打开数据库连接,从中流式传输,关闭数据库连接。第二个选项将允许您从无限文件列表中进行流式传输。

    如果你想从我这里得到代码片段,请给我完整的函数源代码,我会更新它。