代码之家  ›  专栏  ›  技术社区  ›  Lev Knoblock

如何实现多线程merkle树哈希

  •  1
  • Lev Knoblock  · 技术社区  · 6 年前

    我有一个大的列表,我想能够在Java中获取root用户的根目录。它足够大,能够对进程进行多线程处理将大大加快它的速度,因此,我一直在尝试这样做。

    这是我目前的代码:

    public static byte[] multiMerkleRoot(ArrayList<byte[]> temp) {
        int count = temp.size();
        List<byte[]> hashList = new ArrayList<>();
    
        for(byte[] o : temp) {
            hashList.add(merkleHash(o));
        }
    
        if (count % 2 == 0) {
            return getRoot(hashList);
        } else {
            return merkleHash(concat(getRoot(hashList.subList(0, hashList.size() - 1)), hashList.get(hashList.size() - 1)));
        }
    }
    
    private static byte[] getRoot(List<byte[]> temp) {
        if(temp.size() % 2 != 0) {
            return merkleHash(concat(getRoot(temp.subList(0, temp.size() - 1)), temp.get(temp.size() - 1)));
        } else {
            if (temp.size() > 2) {
                List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);
    
                return merkleHash(concat(getRoot(subsets.get(0)), getRoot(subsets.get(1))));
            } else {
                return merkleHash(concat(temp.get(0), temp.get(1)));
            }
        }
    }
    
    public static byte[] trueMultiMerkleRoot(ArrayList<byte[]> temp, int threads) {
        try {
            int count = temp.size();
            List<byte[]> hashList = new ArrayList<>();
    
            for(byte[] o : temp) {
                hashList.add(merkleHash(o));
            }
    
            if(count % 2 == 0) {
                byte[] chunk1 = null;
    
                switch(threads) {
                    case 1: chunk1 = getRoot(hashList);
                            break;
                    case 2: chunk1 = twoThreadMerkle(hashList);
                            break;
                    default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
                            break;
                }
    
                return chunk1;
            } else {
                byte[] chunk1 = null;
                byte[] chunk2 = hashList.get(hashList.size() - 1);
    
                switch(threads) {
                    case 1: chunk1 = getRoot(hashList.subList(0, hashList.size() - 1));
                        break;
                    case 2: chunk1 = twoThreadMerkle(hashList.subList(0, hashList.size() - 1));
                        break;
                    default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
                        break;
                }
    
                return chunk1;
            }
        } catch(Exception e) {
            return null;
        }
    }
    
    private static byte[] twoThreadMerkle(List<byte[]> temp) throws Exception {
        if (!(temp.size() >= 2)) {
            return twoThreadMerkle(temp);
        } else {
            if(temp.size() % 2 != 0) {
                return getRoot(temp);
            } else {
                List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);
    
                Executor exe1 = Executors.newSingleThreadExecutor();
                Executor exe2 = Executors.newSingleThreadExecutor();
    
                Future<byte[]> fut1 = ((ExecutorService) exe1).submit(() -> getRoot(subsets.get(0)));
                Future<byte[]> fut2 = ((ExecutorService) exe2).submit(() -> getRoot(subsets.get(1)));
    
                while ((!fut1.isDone()) || (!fut2.isDone())) {
                    Thread.sleep(500);
                }
    
                return merkleHash(concat(fut1.get(), fut2.get()));
            }
        }
    }
    

    multimerkleroot是单线程版本,truemultimerkleroot是对多线程版本的尝试。

    我的问题是:不管我使用多大的列表(我试过使用2的精确幂,奇数,偶数,小的和大的),我总是从这两种方法中得到两个不同的答案,而且我一生都不知道如何解决这个问题。

    在这个实现中,merklehash()只是keccak 256的包装器,我使用它来散列连接的两个字节数组。

    如果有人能以任何方式帮助我,无论是告诉我我的代码哪里出了问题以及如何修复它,还是只是点燃我的代码并告诉我如何正确地执行它,我都会非常感谢你的帮助。

    编辑:在我意识到以前的方法有问题之后,我尝试了另一种方法。不过,这一个仍然不会多线程,即使我认为它更接近。

    这是我的新密码:

    package crypto;
    
    import org.bouncycastle.util.encoders.Hex;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.*;
    
    import static crypto.Hash.keccak256;
    import static util.ByteUtil.concat;
    
    public class Merkle {
        private Queue<byte[]> data;
    
        public Merkle() {
            this.data = new LinkedList<>();
        }
    
        public Merkle(ArrayList<byte[]> in) {
            this.data = new LinkedList<>();
    
            this.data.addAll(in);
        }
    
        public void add(List<byte[]> in) {
            data.addAll(in);
        }
    
        public void add(byte[] in) {
            data.add(in);
        }
    
        public byte[] hash() {
            Queue<byte[]> nextLevel = new LinkedList<>();
    
            while((data.size() > 1) || (nextLevel.size() > 1)) {
                while(data.size() > 0) {
                    if(data.size() > 1) {
                        nextLevel.add(merkleHash(data.remove(), data.remove()));
                    } else {
                        nextLevel.add(data.remove());
                    }
    
                }
    
                data.addAll(nextLevel);
    
                nextLevel.clear();
            }
    
            return data.remove();
        }
    
        private byte[] hash(Queue<byte[]> data) {
            Queue<byte[]> nextLevel = new LinkedList<>();
    
            while((data.size() > 1) || (nextLevel.size() > 1)) {
    
                while(data.size() > 0) {
                    if(data.size() > 1) {
                        nextLevel.add(merkleHash(data.remove(), data.remove()));
                    } else {
                        nextLevel.add(data.remove());
                    }
    
                }
    
                data.addAll(nextLevel);
    
                nextLevel.clear();
            }
    
            return data.remove();
        }
    
        public byte[] dualHash() throws Exception {
            Queue<byte[]> temp1 = new LinkedList<>();
            Queue<byte[]> temp2 = new LinkedList<>();
    
            if(data.size() == Math.pow(2, log2(data.size()))) return hash();
    
            int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
            while(temp1.size() < temponesize) {
                temp1.add(data.remove());
            }
    
            while(!data.isEmpty()) {
                temp2.add(data.remove());
            }
    
            /*
            ExecutorService exe1 = Executors.newSingleThreadExecutor();
            ExecutorService exe2 = Executors.newSingleThreadExecutor();
            Callable<byte[]> call1 = new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    return hash(temp1);
                }
            };
            Callable<byte[]> call2 = new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    return hash(temp2);
                }
            };
    
            Future<byte[]> fut1 = exe1.submit(call1);
            Future<byte[]> fut2 = exe2.submit(call2);
            */
    
            byte[] tem1 = hash(temp1);
            byte[] tem2 = hash(temp2);
    
    
    
            return merkleHash(tem1, tem2);
        }
    
        public int size() {
            return data.size();
        }
    
        private byte[] merkleHash(byte[] a, byte[] b) {
            return keccak256(concat(a, b));
        }
    
        private byte[] merkleHash(byte[] a) {
            return keccak256(a);
        }
    
        private int log2(int x) {
            return (int)Math.floor((Math.log(x))/(Math.log(2)));
        }
    }
    

    如果我们具体看一下dualhash方法,在这种情况下,它可以工作,并给出与hash方法相同的结果。但是,当我尝试将它委托给两个线程时,就像这样:

    public byte[] dualHash() throws Exception {
            Queue<byte[]> temp1 = new LinkedList<>();
            Queue<byte[]> temp2 = new LinkedList<>();
    
            if(data.size() == Math.pow(2, log2(data.size()))) return hash();
    
            int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
            while(temp1.size() < temponesize) {
                temp1.add(data.remove());
            }
    
            while(!data.isEmpty()) {
                temp2.add(data.remove());
            }
    
            ExecutorService exe1 = Executors.newSingleThreadExecutor();
            ExecutorService exe2 = Executors.newSingleThreadExecutor();
            Callable<byte[]> call1 = new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    return hash(temp1);
                }
            };
            Callable<byte[]> call2 = new Callable<byte[]>() {
                @Override
                public byte[] call() throws Exception {
                    return hash(temp2);
                }
            };
    
            Future<byte[]> fut1 = exe1.submit(call1);
            Future<byte[]> fut2 = exe2.submit(call2);
    
            byte[] tem1 = fut1.get();
            byte[] tem2 = fut2.get();
    
    
    
            return merkleHash(tem1, tem2);
        }
    

    它不再给我预期的结果。 知道为什么吗?

    谢谢!

    1 回复  |  直到 6 年前
        1
  •  0
  •   Lev Knoblock    6 年前

    找到解决方案!

    结果发现我的代码不是问题所在(至少编辑的代码,我百分之百确定第一个代码块是完全错误的)。问题是,两个线程都在使用MessageDigest的一个实例时试图散列结果。现在我强迫他们使用单独的实例,代码运行得很好。

    推荐文章