代码之家  ›  专栏  ›  技术社区  ›  brianegge

Java中并发流水线策略

  •  25
  • brianegge  · 技术社区  · 15 年前

    请考虑以下shell脚本:

    gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 
    

    这有三个进程并行工作,对流进行解压缩、修改和重新压缩。运行 time 我可以看到我的用户时间大约是我实时时间的两倍,这表明程序是有效的并行工作。

    我试图在Java中创建相同的程序,将每个任务放在它自己的线程中。不幸的是,多线程Java程序仅限于 30% faster single threaded 上述样品的版本。我试过同时使用 Exchanger 和A ConcurrentLinkedQueue . ConcurrentLinkedQueue链接的队列会导致大量的争用,尽管这三个线程通常都很忙。交换机的争用率较低,但更为复杂,而且似乎不能让最慢的工作人员在100%的时间内运行。

    我试图找出一个纯Java解决这个问题,而不看一个字节代码编织框架或基于JNI的MPI。

    大多数并发性研究和api都关注 divide-and-conquer 算法,给出每个节点的工作是正交的,不依赖于先前的计算。并发性的另一种方法是管道方法,在这种方法中,每个工作线程都做一些工作,并将数据传递给下一个工作线程。

    我并没有试图找到最有效的方法来sed gzip文件,而是在研究如何有效地分解流水线中的任务,以便将运行时减少到最慢的任务的运行时。

    10M行文件的当前计时如下:

    Testing via shell
    
    real    0m31.848s
    user    0m58.946s
    sys     0m1.694s
    
    Testing SerialTest
    
    real    0m59.997s
    user    0m59.263s
    sys     0m1.121s
    
    Testing ParallelExchangerTest
    
    real    0m41.573s
    user    1m3.436s
    sys     0m1.830s
    
    Testing ConcurrentQueueTest
    
    real    0m44.626s
    user    1m24.231s
    sys     0m10.856s
    

    我悬赏10%的Java改进,这是在一个拥有1000万行测试数据的四核系统上实时测量的。当前源位于 Bitbucket .

    5 回复  |  直到 13 年前
        1
  •  6
  •   chinmaya    15 年前

    我个人验证了所花费的时间,似乎阅读所花费的时间不到10%,阅读加处理所花费的时间不到30%。 所以我把ParallelExchangerTest(代码中性能最好的)修改为 只要有2个线程,第一个线程进行读取和替换,第二个线程进行写入。

    下面是要比较的数据(在我的机器上,英特尔双核(不是Core2)运行的Ubuntu1GB内存)

    >通过外壳测试

    实0m41.601s

    用户0m58.604s

    系统0m1.032s

    >测试并行交换机测试

    真1M55.424s

    用户2m14.160s

    系统0m4.768s

    >ParallelExchangerTestmod(2线程)

    实际1M35.524s

    用户1M55.319S

    系统0m3.580s

    我知道字符串处理需要更长的时间,所以我替换了line.repalce 有了matcher.replaceall,我得到了这个数字

    >ParallelExchangerTestmod_regex(2线程)

    实际1M12.781

    用户1M33.382s

    系统0m2.916s

    现在我向前迈了一步,我不是一次读一行,而是读 char[]不同大小的缓冲区,并对其进行计时(使用regexp search/replace,) 我有这些数字

    >测试ParallelExchangerTestmod_regex_buff(一次处理100字节)

    实际1M13.804S

    用户1M32.494S

    系统0m2.676s

    >测试ParallelExchangerTestmod_regex_buff(一次处理500字节)

    实际1M6.286s

    用户1M29.334

    系统0m2.324s

    >测试ParallelExchangerTestmod_regex_buff(一次处理800字节)

    实际1M12.309S

    用户1M33.910s

    系统0m2.476s

    看起来500字节是数据大小的最佳选择。

    我在这里叉了一份零钱

    https://bitbucket.org/chinmaya/java-concurrent_response/

        2
  •  14
  •   cletus    15 年前

    首先,这个过程只会和最慢的一段一样快。如果定时故障是:

    • 枪声:1秒
    • sed:5秒
    • GZIP:1秒

    通过多线程,您将完成 至多 5秒而不是7秒。

    其次,与其使用您正在使用的队列,不如尝试复制您正在复制和使用的功能 PipedInputStream PipedOutputStream 把过程连在一起。

    编辑: 有几种用Java并发UTL处理相关任务的方法。把它分成线。首先创建一个公共基类:

    public interface Worker {
      public run(InputStream in, OutputStream out);
    }
    

    这个接口所做的是表示处理输入并生成输出的任意作业。把这些绑在一起,你就有了管道。你也可以把样板抽走。为此,我们需要一门课:

    public class UnitOfWork implements Runnable {
      private final InputStream in;
      private final OutputStream out;
      private final Worker worker;
    
      public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
        if (in == null) {
          throw new NullPointerException("in is null");
        }
        if (out == null) {
          throw new NullPointerException("out is null");
        }
        if (worker == null) {
          throw new NullPointerException("worker is null");
        }
        this.in = in;
        this.out = out;
        this.worker = worker;
      }
    
      public final void run() {
        worker.run(in, out);
      }
    }
    

    例如 Unzip 部分:

    public class Unzip implements Worker {
      protected void run(InputStream in, OutputStream out) {
        ...
      }
    }
    

    等等 Sed Zip . 然后将其结合在一起的是:

    public static void pipe(InputStream in, OutputStream out, Worker... workers) {
      if (workers.length == 0) {
        throw new IllegalArgumentException("no workers");
      }
      OutputStream last = null;
      List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
      PipedOutputStream last = null;
      for (int i=0; i<workers.length-2; i++) {
        PipedOutputStream out = new PipedOutputStream();
        work.add(new UnitOfWork(
          last == null ? in, new PipedInputStream(last), out, workers[i]);
        last = out;
      }
      work.add(new UnitOfWork(new PipedInputStream(last),
        out, workers[workers.length-1);
      ExecutorService exec = Executors.newFixedThreadPool(work.size());
      for (UnitOfWork w : work) {
        exec.submit(w);
      }
      exec.shutdown();
      try {
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
      } catch (InterruptedExxception e) {
        // do whatever
      }
    }
    

    我不确定你是否能做得更好,而且每个工作都有最少的代码可写。然后你的代码变成:

    public static processFile(String inputName, String outputName) {
      pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
        new Zip(), new Sed(), new Unzip());
    }
    
        3
  •  3
  •   Hardcoded    15 年前

    你也可以在爪哇使用管道。它们被实现为流,请参见 PipedInputStream PipedOutputStream 更多细节。

    为了防止堵塞,我建议放置一个支撑管尺寸。

        4
  •  3
  •   Paul Wagland    15 年前

    考虑到你没有说你是如何测量经过的时间的,我假设你使用的是:

    time java org.egge.concurrent.SerialTest < in.gz > out.gz
    time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz
    

    问题是你在这里测量两样东西:

    1. jvm启动需要多长时间,以及
    2. 程序运行需要多长时间。

    您只能通过更改代码来更改第二个。使用你给出的数字:

    Testing SerialTest
    real    0m6.736s
    user    0m6.924s
    sys     0m0.245s
    
    Testing ParallelExchangerTest
    real    0m4.967s
    user    0m7.491s
    sys     0m0.850s
    

    如果我们假设jvm启动需要3秒,那么“程序运行时间”分别是3.7秒和1.9秒,这几乎是100%的加速。我强烈建议您使用更大的数据集进行测试,这样您就可以将jvm启动对计时结果的影响降到最低。

    编辑 :根据您对这个问题的回答,您可能正遭受锁争用的困扰。在Java中解决这个问题的最好方法可能是使用管道阅读器和编写器,从管道中读取,一次一字节,并替换任何一个。 '@' 输入流中的字符 "_at_" 在输出流中。您可能会遇到这样的问题:每个字符串都会被扫描三次,任何替换都需要构建一个新对象,而字符串最终会再次被复制。希望这有帮助…

        5
  •  0
  •   KarlP    15 年前

    减少读取和对象的数量可以使我的性能提高10%以上。

    但是java.util.concurrent的性能仍然有点令人失望。

    ConcurrentQueueTest:

    private static class Reader implements Runnable {
    
    @Override
      public void run() {
       final char buf[] = new char[8192];
       try {
    
        int len;
        while ((len = reader.read(buf)) != -1) {
         pipe.put(new String(buf,0,len));
        }
        pipe.put(POISON);
    
       } catch (IOException e) {
        throw new RuntimeException(e);
       } catch (InterruptedException e) {
        throw new RuntimeException(e);
       }
      }