代码之家  ›  专栏  ›  技术社区  ›  sp_user123

编写大量文件的最佳方法

  •  -1
  • sp_user123  · 技术社区  · 9 年前

    我正在写很多像下面这样的文件。

    public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
            throws Exception {
        // TODO Auto-generated method stub
    
        while (arg0.hasNext()) {
            Tuple2<Text, BytesWritable> tuple2 = arg0.next();
            System.out.println(tuple2._1().toString());
            PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
            writer.println(new String(tuple2._2().getBytes()));
            writer.close();
        }
    }
    

    有什么更好的方法来写这些文件吗。。而无需每次关闭或创建打印机。

    2 回复  |  直到 9 年前
        1
  •  2
  •   Stephen C    9 年前

    没有更好的方法来编写大量文件。您所做的工作本质上是I/O密集型的。

    更新 -我认为迈克尔·安德森是对的。使用多个线程来写入文件(可能)将大大加快速度。然而,从几个方面来看,I/O仍然是最终的瓶颈:

    • 创建、打开和关闭文件涉及文件&目录元数据访问和更新。这需要非平凡的CPU。

    • 文件数据和元数据更改需要写入磁盘。这可能是多次光盘写入。

    • 每个写入的文件至少有3个系统调用。

    • 然后是缝线开销。

    除非写入每个文件的数据量很大(每个文件有几千字节),否则我怀疑使用NIO、直接缓冲区、JNI等技术是否值得。真正的瓶颈将在内核:文件系统操作和低级磁盘I/O。


    …而无需每次关闭或创建打印机。

    否。您需要创建一个新的 PrintWriter (或 Writer OutputStream )对于每个文件。

    然而,这。。。

      writer.println(new String(tuple2._2().getBytes()));
    

    …看起来很奇怪。您似乎是:

    • 使命感 getBytes() String (?),
    • 将字节数组转换为 一串
    • 调用 println() 方法 一串 它将复制它,并在最终输出它们之前将其转换回字节。

    有什么好处?字符串的意义是什么->字节->字符串转换?

    我会这样做:

      writer.println(tuple2._2());
    

    这应该更快,尽管我不认为加速百分比会那么大。

        2
  •  1
  •   Michael Anderson    9 年前

    我想你是在走最快的路。因为每个人都知道最快是最好的;)

    一个简单的方法是使用一堆线程来为您写作。 但是,除非您的文件系统能够很好地扩展,否则这样做不会带来太多好处。(我在基于Luster的集群系统上使用这一技术,在“大量文件”可能意味着10k的情况下,在这种情况下,许多写入将被发送到不同的服务器/磁盘)

    代码应该是这样的:(注意,我认为这个版本不正确,因为对于少量的文件,这会填充工作队列-但无论如何,请查看下一个版本以获得更好的版本…)

    public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
        int nThreads=5;
        ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
        ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
    
        int nJobs = 0;
    
        while (arg0.hasNext()) {
            ++nJobs;
            final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
            ecs.submit(new Callable<Void>() {
              @Override Void call() {
                 System.out.println(tuple2._1().toString());
                 String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
                 try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
                   writer.println(new String(tuple2._2().getBytes()))
                 }
                 return null;
              }
           });
        }
        for(int i=0; i<nJobs; ++i) {
           ecs.take().get();
        }
    }
    

    更好的方法是,当你有第一个文件的数据时,就开始写文件,而不是当你有了所有文件的数据后,这样写就不会阻塞计算线程。

    要做到这一点,您将应用程序分成几个部分,通过(线程安全)队列进行通信。

    代码最终看起来更像这样:

    public void main() {
      SomeMultithreadedQueue<Data> queue = ...;
    
      int nGeneratorThreads=1;
      int nWriterThreads=5;
      int nThreads = nGeneratorThreads + nWriterThreads;
    
      ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
      ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
    
      AtomicInteger completedGenerators = new AtomicInteger(0);
    
      // Start some generator threads.
      for(int i=0; ++i; i<nGeneratorThreads) {
        ecs.submit( () -> { 
          while(...) { 
            Data d = ... ;
            queue.push(d);
          }
          if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
            queue.push(null);
          }
          return null;
       });
      }
    
      // Start some writer threads
      for(int i=0; i<nWriterThreads; ++i) {
        ecs.submit( () -> { 
          Data d
          while((d = queue.take())!=null) {
            String path = data.path();
            try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
               writer.println(new String(data.getBytes()));
            }
            return null;
          }
        });
      }
    
      for(int i=0; i<nThreads; ++i) {
        ecs.take().get();
      }
    }
    

    注意,我没有提供队列类的实现,您可以轻松地包装标准的java线程安全类以获得所需的内容。

    还有很多事情可以做,以减少延迟等-这是我用来降低时间的一些进一步的事情。。。

    1. 甚至不要等待为给定文件生成所有数据。传递另一个包含要写入的字节数据包的队列。

    2. 注意分配-您可以重用一些缓冲区。

    3. nio中存在一些延迟——通过使用C写、JNI和直接缓冲区,可以获得一些性能改进。

    4. 线程切换可能会造成伤害,队列中的延迟也可能会造成损害,因此您可能需要稍微对数据进行批处理。用1来平衡这一点可能很棘手。