代码之家  ›  专栏  ›  技术社区  ›  Sam

异步基于队列的文件写入

  •  0
  • Sam  · 技术社区  · 6 年前

    我在Java中编写多线程程序,并有一个单独的写入线程运行。一旦线程处理了一块数据,它们就向 LinkedBlockingQueue 在作者线程中通过 synchronized writeToFile 中的方法 writer .

    其思想是,一旦队列达到一定的大小,就阻止线程追加到队列,并将数据输出到文件。我正在处理大量数据(一次20-50GB),这有助于减少所用的RAM。(如果有更好的方法,我愿意接受建议!)

    我的问题是尽管 writeToFile 方法已同步,并通过 emptyQueues 在一个 synchonrized 块,当线程正在写入文件时,线程仍附加到队列。

    @Component("writer")
    public class WriterImpl implements Writer {
    
    private boolean isRunning;
    private PrintWriter fastQWriter1, fastQWriter2;
    private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
    private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
    private final int MAX_QUEUE_SIZE = 5000;
    
    @Override
    public void setOutputFiles(File fastQ1, File fastQ2) {
        try{
            fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
            fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
        }catch (IOException ioe){
            System.out.println(ioe.getMessage());
        }
    }
    
    @Override
    public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
        fastQQueue1.add(one);
        fastQQueue2.add(two);
    }
    
    @Override
    public void close() {
        isRunning = false;
    
        emptyQueues();
    
        fastQWriter1.flush();
        fastQWriter1.close();
        fastQWriter2.flush();
        fastQWriter2.close();
    }
    
    @Override
    public void run() {
        isRunning = true;
    
        while(isRunning){
            //do stuff
            if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time
    
                synchronized (fastQQueue1){
                    synchronized (fastQQueue2){
                        emptyQueues();
                    }
                }
            }
        }
    }
    
    private void emptyQueues() {
        while(fastQQueue1.size() > 0){
            FastQRecord one = fastQQueue1.poll();
    
            fastQWriter1.println(one.getId());
            fastQWriter1.println(one.getRawSequence());
            fastQWriter1.println(one.getPlus());
            fastQWriter1.println(one.getQualityString());
        }
    
        while(fastQQueue2.size() > 0){
    
            FastQRecord two = fastQQueue2.poll();
            fastQWriter2.println(two.getId());
            fastQWriter2.println(two.getRawSequence());
            fastQWriter2.println(two.getPlus());
            fastQWriter2.println(two.getQualityString());
    
        }
    }
    }  
    

    这个 FastQRecord 只是一个简单的pojo,它保存了我需要写入文件的数据:

    public class FastQRecord {
    
    private String id;
    private String rawSequence;
    private char plus;
    private String qualityString;
    
    public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
        this.id = id;
        this.rawSequence = rawSequence;
        this.plus = plus;
        this.qualityString = qualityString;
    }
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public String getRawSequence() {
        return rawSequence;
    }
    
    public void setRawSequence(String rawSequence) {
        this.rawSequence = rawSequence;
    }
    
    public char getPlus() {
        return plus;
    }
    
    public void setPlus(char plus) {
        this.plus = plus;
    }
    
    public String getQualityString() {
        return qualityString;
    }
    
    public void setQualityString(String qualityString) {
        this.qualityString = qualityString;
    }
    
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
    
        FastQRecord that = (FastQRecord) o;
    
        return id.equals(that.id);
    }
    
    @Override
    public int hashCode() {
        return id.hashCode();
    }
    
    @Override
    public String toString() {
        return "FastQRecord{" +
                "id=" + id + '\n' +
                ", rawSequence=" + rawSequence + '\n' +
                ", plus=" + plus + '\n' +
                ", qualityString=" + qualityString + '\n' +
                '}';
    }
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Emanuele Giona    6 年前

    你可以利用 BlockingQueue 通过使用 put() 方法而不是 add() 一个是继承自 Collection .

    但是为了让线程在 () 操作,您的队列必须知道其最大大小,并将其声明为 LinkedBlockingQueue<>(MAX_QUEUE_SIZE) . 如果不指定队列的最大容量, it will be assumed it's Integer.MAX_VALUE

    我还建议您在检查队列大小(或者队列已满)和 run() 方法如下所示:

    @Override
    public void run() {
        isRunning = true;
    
        while(isRunning){
            //do stuff
            synchronized(fastQQueue1){
                if(fastQQueue1.remainingCapacity() == 0){ //empty queues - 5000 record pairs at a time
    
                    synchronized (fastQQueue1){
                        synchronized (fastQQueue2){
                            emptyQueues();
                        }
                    }
                }
            }
        }
    }
    

    类似的更改也适用于 emptyQueues() 方法。