我在Java中编写多线程程序,并有一个单独的写入线程运行。一旦线程处理了一块数据,它们就向
LinkedBlockingQueue
在作者线程中通过
synchronized writeToFile
中的方法
writer
.
其思想是,一旦队列达到一定的大小,就阻止线程追加到队列,并将数据输出到文件。我正在处理大量数据(一次20-50GB),这有助于减少所用的RAM。(如果有更好的方法,我愿意接受建议!)
我的问题是尽管
writeToFile
方法已同步,并通过
emptyQueues
在一个
synchonrized
块,当线程正在写入文件时,线程仍附加到队列。
@Component("writer")
public class WriterImpl implements Writer {
private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;
@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
try{
fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
}catch (IOException ioe){
System.out.println(ioe.getMessage());
}
}
@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
fastQQueue1.add(one);
fastQQueue2.add(two);
}
@Override
public void close() {
isRunning = false;
emptyQueues();
fastQWriter1.flush();
fastQWriter1.close();
fastQWriter2.flush();
fastQWriter2.close();
}
@Override
public void run() {
isRunning = true;
while(isRunning){
//do stuff
if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time
synchronized (fastQQueue1){
synchronized (fastQQueue2){
emptyQueues();
}
}
}
}
}
private void emptyQueues() {
while(fastQQueue1.size() > 0){
FastQRecord one = fastQQueue1.poll();
fastQWriter1.println(one.getId());
fastQWriter1.println(one.getRawSequence());
fastQWriter1.println(one.getPlus());
fastQWriter1.println(one.getQualityString());
}
while(fastQQueue2.size() > 0){
FastQRecord two = fastQQueue2.poll();
fastQWriter2.println(two.getId());
fastQWriter2.println(two.getRawSequence());
fastQWriter2.println(two.getPlus());
fastQWriter2.println(two.getQualityString());
}
}
}
这个
FastQRecord
只是一个简单的pojo,它保存了我需要写入文件的数据:
public class FastQRecord {
private String id;
private String rawSequence;
private char plus;
private String qualityString;
public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
this.id = id;
this.rawSequence = rawSequence;
this.plus = plus;
this.qualityString = qualityString;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getRawSequence() {
return rawSequence;
}
public void setRawSequence(String rawSequence) {
this.rawSequence = rawSequence;
}
public char getPlus() {
return plus;
}
public void setPlus(char plus) {
this.plus = plus;
}
public String getQualityString() {
return qualityString;
}
public void setQualityString(String qualityString) {
this.qualityString = qualityString;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FastQRecord that = (FastQRecord) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return "FastQRecord{" +
"id=" + id + '\n' +
", rawSequence=" + rawSequence + '\n' +
", plus=" + plus + '\n' +
", qualityString=" + qualityString + '\n' +
'}';
}
}