代码之家  ›  专栏  ›  技术社区  ›  Falk Schuetzenmeister

Akka:如何在一个图形阶段提取值并在下一个图形阶段使用

  •  0
  • Falk Schuetzenmeister  · 技术社区  · 7 年前

    我正在使用Alpakka和Akka处理CSV文件。因为我有一堆CSV文件必须添加到同一个流中,所以我想添加一个包含文件名或请求信息的字段。目前我有这样的想法:

    val source = FileIO.fromPath(Paths.get("10002070.csv"))
      .via(CsvParsing.lineScanner())
    

    它流式处理字节测试环(字段)的列表(行)序列。目标如下:

    val filename = "10002070.csv"
    val source = FileIO.fromPath(Path.get(filename))
        .via(CsvParsing.lineScanner())
        .via(AddCSVFieldHere(filename))
    

    创建类似于以下内容的结构:

    10002070.csv,max,estimated,12,1,0
    

    其中文件名是原始源中不存在的字段。

    我觉得在流中注入值看起来不太好,而且最终我想确定在读取目录的流阶段中传递给解析的文件名。

    通过流阶段传递值以供以后重用的正确/规范方法是什么?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Jeffrey Chung    7 年前

    您可以使用 map 将文件名添加到每个 List[ByteString] :

    val fileName = "10002070.csv"
    val source =
      FileIO.fromPath(Path.get(fileName))
        .via(CsvParsing.lineScanner())
        .map(List(ByteString(fileName)) ++ _)
    

    例如:

    Source.single(ByteString("""header1,header2,header3
                               |1,2,3
                               |4,5,6""".stripMargin))
      .via(CsvParsing.lineScanner())
      .map(List(ByteString("myfile.csv")) ++ _)
      .runForeach(row => println(row.map(_.utf8String)))
    
    // The above code prints the following:
    // List(myfile.csv, header1, header2, header3)
    // List(myfile.csv, 1, 2, 3)
    // List(myfile.csv, 4, 5, 6)
    

    同样的方法也适用于更一般的情况,即您事先不知道文件名。如果您想读取目录中的所有文件(假设所有这些文件都是csv文件),将文件连接到单个流中,并在每个流元素中保留文件名,那么您可以使用Alpakka的 Directory 按以下方式使用:

    val source =
      Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
        .flatMapConcat { path =>
           FileIO.fromPath(path)
             .via(CsvParsing.lineScanner())
             .map(List(ByteString(path.getFileName.toString)) ++ _)
        }