代码之家  ›  专栏  ›  技术社区  ›  Metadata

如何使用API JavaUTI.CONTURNET.WORD而不是在Java中显式创建线程?

  •  1
  • Metadata  · 技术社区  · 6 年前

    我有两个线程在Java程序中并行运行如下:

    // Threading
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                gpTableCount   = getGpTableCount();
            } catch (SQLException e) {
                e.printStackTrace();
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }).start();
    
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                hiveTableCount = getHiveTableCount();
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }).start();
    
    while(!(gpTableCount != null && gpTableCount.size() > 0 && hiveTableCount != null && hiveTableCount.size() > 0)) {
        Thread.sleep(5000);
    }
    // Threading
    

    它们都有相同的功能。下面是getHiveTableCount()中的代码。另一种方法与下面的方法稍有不同(一行或两行),但功能保持不变。

    public Map<String, String> getHiveTableCount() throws IOException, SQLException {
        hiveDataMap     = new HashMap<String, String>();
        hiveTableErrs   = new HashMap<String, String>();
        Iterator<String> hiveIterator = filteredList.iterator();
        Connection hiveConnection = DbManager.getHiveConnection();
        PreparedStatement hive_pstmnt = null;
        String hiveExcpnMsg;
        String ssn;
        String hiveMaxUpdTms;
        Long hiveCount;
        String gpHiveRec;
        String[] hiveArray;
        String[] hiveDetails;
        String hiveQuery;
        while(hiveIterator.hasNext()) {
            gpHiveRec   = hiveIterator.next();      
            hiveArray   = gpHiveRec.split(",");     
            hiveDetails = hiveArray[1].split("\\.");
            hiveQuery   = "select '" + hiveDetails[1] + "' as TableName, count(*) as Count, source_system_name, max(xx_last_update_tms) from " + hiveArray[1] + " where source_system_name='" + hiveArray[2] + "' group by source_system_name";
            try {
                hive_pstmnt             = hiveConnection.prepareStatement(hiveQuery);
                ResultSet hiveCountRs   = hive_pstmnt.executeQuery();
                while(hiveCountRs.next()) {
                    hiveCount     = hiveCountRs.getLong(2);
                    ssn           = hiveCountRs.getString(3);
                    hiveMaxUpdTms = hiveCountRs.getTimestamp(4).toString();
                    hiveDataMap.put(hiveDetails[1] + "," + ssn, hiveCount + "," + hiveMaxUpdTms);
                }
            } catch(org.postgresql.util.PSQLException e) {
                hiveExcpnMsg = e.getMessage();
                hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "\n");
            } catch(SQLException e) {
                hiveExcpnMsg = e.getMessage();
                hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "\n");
            } catch(Exception e) {
                hiveExcpnMsg = e.getMessage();
                hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "\n");
            }
        }
        return hiveDataMap;
    }
    

    这两个线程同时运行。我最近在网上看到:

    Future类表示异步计算的未来结果 最终会在未来出现的结果 处理完成。

    我从理论上理解了这个概念,但我不知道如何将java.util.concurrent.future API应用于上述代码,而不是显式地创建线程。 有人能告诉我如何在这些方法上实现多线程吗? getGpTableCount() & getHiveTableCount 使用java.util.concurrent.future API而不是创建线程创建新线程,如新线程(new runnable()?

    3 回复  |  直到 6 年前
        1
  •  1
  •   Shmulik Klein    6 年前

    您正在使用提交任务 Runnable 不允许线程在计算结束时返回值(并导致使用共享变量)的接口- gpTableCount hiveTableCount )

    这个 Callable 接口是稍后添加的,它允许您的任务返回一个值(在您的情况下, Map<String, String> )

    作为直接处理线程的替代方法,并发API引入了 ExecutorService 作为管理线程池并能够异步执行任务的高级对象。

    提交类型的任务时 可调用 遗嘱执行人服务 您期望任务生成一个值,但是由于提交点和计算结束没有耦合,因此 遗嘱执行人服务 会回来的 Future 如果该值不可用,则可以获取该值并阻止。因此, 未来 可用于在不同线程之间进行同步。

    作为替代 遗嘱执行人服务 你也可以看看 FutureTask<V> 这是执行 RunnableFuture<V> :

    此类提供了 未来 ,使用启动和取消计算的方法,查询以查看计算是否完成,并检索计算结果。

    FutureTask可用于包装可调用或可运行的对象。

        2
  •  0
  •   Tomasz    6 年前

    首先,创建最适合您需求的执行器服务,例如:

    ExecutorService ex = Executors.newFixedThreadPool(2);
    

    (更多关于遗嘱执行人: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html )

    然后将runnable对象更改为可调用的,类似于runnable,但返回一个值(有关可调用的更多信息: https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/Callable.html ):

    Callable<Map<String, String>> callable1 = // your Callable class 
    

    类型参数应与要作为结果返回的类型相同。

    接下来,创建任务列表:

    List<Callable<Map<String, String>>> tasks = new LinkedList<>();
    tasks.add(callable1);
    tasks.add(callable2);
    

    然后执行它们:

    Future<Map<String, String>> results = ex.invokeAll(tasks);
    

    当所有任务都完成时(如果我正确理解您的情况,这是您希望实现的),返回上面的方法,但是完成的任务可以正常终止,也可以通过抛出异常终止。

    最后,关闭执行器服务:

    ex.shutdown();
    
        3
  •  0
  •   Bassem Reda Zohdy    6 年前

    如果您使用的是Java 8 +,您可以使用 CompletableFuture.supplyAsync 简而言之,比如:

    import static java.util.concurrent.CompletableFuture.supplyAsync;
    .....
    Future<Map<String, String>> f= supplyAsync(()->{
        try{
            return getHiveTableCount();
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }
    

    完成未来.supplyAsync 将在默认情况下使用 ForkJoinPool.commonPool() 它也有另一个重叠 Executor 如果要使用自己的参数:

    public class CompletableFuture<T>
    extends Object
    implements Future<T>, CompletionStage<T>
    

    也有。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)