代码之家  ›  专栏  ›  技术社区  ›  Mark Baker

如何处理Java中的多个流?

  •  4
  • Mark Baker  · 技术社区  · 16 年前

    我正在尝试运行一个进程,并处理它的输入、输出和错误流。最明显的方法是使用 select() 但我在爪哇唯一能找到的就是 Selector.select() ,需要一个 Channel . 似乎不可能 通道 从一个 InputStream OutputStream ( FileStream 有一个 getChannel() 方法,但在这里没有帮助)

    因此,我编写了一些代码来轮询所有流:

    while( !out_eof || !err_eof )
    {
        while( out_str.available() )
        {
            if( (bytes = out_str.read(buf)) != -1 )
            {
                // Do something with output stream
            }
            else
                out_eof = true;
        }
        while( err_str.available() )
        {
            if( (bytes = err_str.read(buf)) != -1 )
            {
                // Do something with error stream
            }
            else
                err_eof = true;
        }
        sleep(100);
    }
    

    这是可行的,除非它永远不会终止。当其中一个流到达文件结尾时, available() 返回零 read() 没有被调用,我们永远不会得到-1返回,这将指示EOF。

    一种解决方案是使用非阻塞方法检测EOF。我在任何地方都看不到。或者有更好的方法来做我想做的事情吗?

    我在这里看到这个问题: link text 虽然它不能完全满足我的需要,但我可能会使用这个想法,为每个流生成单独的线程,以解决我现在遇到的特定问题。但这肯定不是唯一的方法吗?当然,必须有一种方法不用为每个流使用线程就可以从多个流中读取数据?

    2 回复  |  直到 10 年前
        1
  •  4
  •   Community Erin Dees    7 年前

    如你所说,解决方案 outlined in this Answer 是从进程中读取stdout和stderr的传统方法。每个流都有一个线程是可行的,尽管这有点烦人。

        2
  •  2
  •   Daniel Schneller    16 年前

    实际上,您必须为要监视的每个流生成一个线程。如果您的用例允许组合相关进程的stdout和stderr,那么您只需要一个线程,否则需要两个线程。

    我花了相当长的时间才完成了一个项目,在这个项目中,我必须启动一个外部进程,在它的输出端做一些事情,同时寻找错误和进程终止,并且当Java应用程序的用户取消操作时,也可以终止它。

    我创建了一个相当简单的类来封装监视部分,其run()方法如下所示:

    public void run() {
        BufferedReader tStreamReader = null;
        try {
            while (externalCommand == null && !shouldHalt) {
                logger.warning("ExtProcMonitor("
                               + (watchStdErr ? "err" : "out")
                               + ") Sleeping until external command is found");
                Thread.sleep(500);
            }
            if (externalCommand == null) {
                return;
            }
            tStreamReader =
                    new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                            : externalCommand.getInputStream()));
            String tLine;
            while ((tLine = tStreamReader.readLine()) != null) {
                logger.severe(tLine);
                if (filter != null) {
                    if (filter.matches(tLine)) {
                        informFilterListeners(tLine);
                        return;
                    }
                }
            }
        } catch (IOException e) {
            logger.logExceptionMessage(e, "IOException stderr");
        } catch (InterruptedException e) {
            logger.logExceptionMessage(e, "InterruptedException waiting for external process");
        } finally {
            if (tStreamReader != null) {
                try {
                    tStreamReader.close();
                } catch (IOException e) {
                    // ignore
                }
            }
        }
    }
    

    在主叫方,情况如下:

        Thread tExtMonitorThread = new Thread(new Runnable() {
    
            public void run() {
                try {
                    while (externalCommand == null) {
                        getLogger().warning("Monitor: Sleeping until external command is found");
                        Thread.sleep(500);
                        if (isStopRequested()) {
                            getLogger()
                                    .warning("Terminating external process on user request");
                            if (externalCommand != null) {
                                externalCommand.destroy();
                            }
                            return;
                        }
                    }
                    int tReturnCode = externalCommand.waitFor();
                    getLogger().warning("External command exited with code " + tReturnCode);
                } catch (InterruptedException e) {
                    getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
                }
            }
        }, "ExtCommandWaiter");
    
        ExternalProcessOutputHandlerThread tExtErrThread =
                new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
        ExternalProcessOutputHandlerThread tExtOutThread =
                new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
        tExtMonitorThread.start();
        tExtOutThread.start();
        tExtErrThread.start();
        tExtErrThread.setFilter(new FilterFunctor() {
    
            public boolean matches(Object o) {
                String tLine = (String)o;
                return tLine.indexOf("Error") > -1;
            }
        });
    
        FilterListener tListener = new FilterListener() {
            private boolean abortFlag = false;
    
            public boolean shouldAbort() {
                return abortFlag;
            }
    
            public void matched(String aLine) {
                abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
            }
    
        };
    
        tExtErrThread.addFilterListener(tListener);
        externalCommand = new ProcessBuilder(aCommand).start();
        tExtErrThread.setProcess(externalCommand);
        try {
            tExtMonitorThread.join();
            tExtErrThread.join();
            tExtOutThread.join();
        } catch (InterruptedException e) {
            // when this happens try to bring the external process down 
            getLogger().severe("Aborted because auf InterruptedException.");
            getLogger().severe("Killing external command...");
            externalCommand.destroy();
            getLogger().severe("External command killed.");
            externalCommand = null;
            return -42;
        }
        int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();
    
        externalCommand = null;
        try {
            getLogger().warning("command exit code: " + tRetVal);
        } catch (IllegalThreadStateException ex) {
            getLogger().warning("command exit code: unknown");
        }
        return tRetVal;
    

    不幸的是,对于一个独立的可运行的示例,我不必这样做,但这可能会有所帮助。 如果我必须再做一次,我将使用thread.interrupt()方法而不是自制的停止标志(请注意声明它是volatile!)但是我把它留了一段时间。:)