代码之家  ›  专栏  ›  技术社区  ›  membersound

如何在CompletableFuture中保留slf4j MDC日志上下文?

  •  11
  • membersound  · 技术社区  · 6 年前

    执行异步时 CompletableFuture ,父线程上下文,以及 org.slf4j.MDC 上下文丢失。

    这很糟糕,因为我使用某种“鱼标记”来跟踪多个日志文件中一个请求的日志。

    MDC.put("fishid", randomId())

    问题:如何在的任务期间保留该id CompletableFutures 一般来说

    List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
        tasks.stream()
            .map(task -> CompletableFuture.supplyAsync(
                () -> businesslogic(task))
            .collect(Collectors.toList());
    
    List results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
    
    public void businesslogic(Task task) {
           LOGGER.info("mdc fishtag context is lost here");
    }
    
    4 回复  |  直到 6 年前
        1
  •  19
  •   membersound    5 年前

    我解决这个问题的最具可读性的方法如下-

    ---------------线程utils类--------------------

    public static Runnable withMdc(Runnable runnable) {
        Map<String, String> mdc = MDC.getCopyOfContextMap();
        return () -> {
            MDC.setContextMap(mdc);
            runnable.run();
        };
    }
    
    public static <U> Supplier<U> withMdc(Supplier<U> supplier) {
        Map<String, String> mdc = MDC.getCopyOfContextMap();
        return (Supplier) () -> {
            MDC.setContextMap(mdc);
            return supplier.get();
        };
    }
    

    ---------------使用情况--------------

    CompletableFuture.supplyAsync(withMdc(() -> someSupplier()))
                     .thenRunAsync(withMdc(() -> someRunnable())
                     ....
    

    ThreadUtils中的WithMdc必须重载,以包括CompletableFuture接受的其他功能接口

    请注意,withMdc()方法是静态导入的,以提高可读性。

        2
  •  7
  •   membersound    6 年前

    最后,我创建了一个 Supplier 包裹器固定 MDC 。如果有人有更好的想法,请随时发表评论。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
        return CompletableFuture.supplyAsync(new SupplierMDC(supplier), executor);
    }
    
    private static class SupplierMDC<T> implements Supplier<T> {
        private final Supplier<T> delegate;
        private final Map<String, String> mdc;
    
        public SupplierMDC(Supplier<T> delegate) {
            this.delegate = delegate;
            this.mdc = MDC.getCopyOfContextMap();
        }
    
        @Override
        public T get() {
            MDC.setContextMap(mdc);
            return delegate.get();
        }
    }
    
        3
  •  2
  •   Laks    5 年前

    我的解决方案主题是to(它将与JDK 9+配合使用,因为自该版本以来,已经公开了两个可重写的方法)

    使整个生态系统了解MDC

    为此,我们需要解决以下情况:

    • 我们什么时候才能从这个类中获得CompletableFuture的新实例? 我们需要返回一个支持MDC的版本。
    • 我们什么时候才能从这个类之外获得CompletableFuture的新实例? 我们需要返回一个支持MDC的版本。
    • 在CompletableFuture类中使用哪个执行器? 在任何情况下,我们都需要确保所有执行人都了解MDC

    为此,让我们创建一个支持MDC的版本类 CompletableFuture 通过扩展它。我的版本如下所示

    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {
    
        public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();
    
        @Override
        public CompletableFuture newIncompleteFuture() {
            return new MDCAwareCompletableFuture();
        }
    
        @Override
        public Executor defaultExecutor() {
            return MDC_AWARE_ASYNC_POOL;
        }
    
        public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
            return new MDCAwareCompletableFuture<>()
                    .completeAsync(() -> null)
                    .thenCombineAsync(future, (aVoid, value) -> value);
        }
    
        public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                    Function<Throwable, T> throwableFunction) {
            Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return getMDCAwareCompletionStage(future)
                    .handle((value, throwable) -> {
                        setMDCContext(contextMap);
                        if (throwable != null) {
                            return throwableFunction.apply(throwable);
                        }
                        return value;
                    });
        }
    }
    

    这个 MDCAwareForkJoinPool 类看起来像(跳过了 ForkJoinTask 参数(为简单起见)

    public class MDCAwareForkJoinPool extends ForkJoinPool {
        //Override constructors which you need
    
        @Override
        public <T> ForkJoinTask<T> submit(Callable<T> task) {
            return super.submit(MDCUtility.wrapWithMdcContext(task));
        }
    
        @Override
        public <T> ForkJoinTask<T> submit(Runnable task, T result) {
            return super.submit(wrapWithMdcContext(task), result);
        }
    
        @Override
        public ForkJoinTask<?> submit(Runnable task) {
            return super.submit(wrapWithMdcContext(task));
        }
    
        @Override
        public void execute(Runnable task) {
            super.execute(wrapWithMdcContext(task));
        }
    }
    

    要包装的实用方法如下

    public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
        //save the current MDC context
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return () -> {
            setMDCContext(contextMap);
            try {
                return task.call();
            } finally {
                // once the task is complete, clear MDC
                MDC.clear();
            }
        };
    }
    
    public static Runnable wrapWithMdcContext(Runnable task) {
        //save the current MDC context
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return () -> {
            setMDCContext(contextMap);
            try {
                return task.run();
            } finally {
                // once the task is complete, clear MDC
                MDC.clear();
            }
        };
    }
    
    public static void setMDCContext(Map<String, String> contextMap) {
       MDC.clear();
       if (contextMap != null) {
           MDC.setContextMap(contextMap);
        }
    }
    

    以下是一些使用指南:

    • 使用类 MDCAwareCompletableFuture 而不是班级 可完成的未来
    • 类中的几个方法 可完成的未来 实例化自我版本,例如 new CompletableFuture... 。对于此类方法(大多数公共静态方法),请使用其他方法获取 MDCAwareCompletableFuture 。使用替代方案的一个示例可以是 CompletableFuture.supplyAsync(...) ,您可以选择 new MDCAwareCompletableFuture<>().completeAsync(...)
    • 转换的实例 可完成的未来 MDCAwareCompletableFuture 通过使用该方法 getMDCAwareCompletionStage 当您因为某个外部库返回的实例而陷入其中时 可完成的未来 。显然,您不能在该库中保留上下文,但此方法在代码命中应用程序代码后仍会保留上下文。
    • 在提供executor作为参数时,请确保它是MDC感知的,例如 MDCAwareForkJoinPool 。您可以创建 MDCAwareThreadPoolExecutor 通过重写 execute 方法来服务于您的用例。你明白了!

    这样,您的代码就会如下所示

    List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
        tasks.stream()
            new MDCAwareCompletableFuture<UpdateHotelAllotmentsRsp>().completeAsync(
                () -> businesslogic(task))
            .collect(Collectors.toList());
    
    List results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
    
    public UpdateHotelAllotmentsRsp businesslogic(Task task) {
           LOGGER.info("mdc fishtag context is not lost here");
    }
    

    您可以找到 详细说明 以上所有内容 post 大致相同。

        4
  •  0
  •   Dean Hiller    6 年前

    是的,Twitter Future做到了这一点。他们有一个本地班。scala的未来。scala知道。

    修复程序是让java作者修复这个问题,以便您的本地州遍历所有使用CompletableFutures的库。基本上是本地的。scala由Future使用,在内部使用ThreadLocal直到。然后应用或。然后接受,它将捕获状态并在需要时将其传输到下一个状态。这适用于所有第三方库,第三方库更改为零。

    这里有更多的但是戳Java作者来修复他们的东西。。。 http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html

    在此之前,MDC永远不会通过第三方库工作。

    我的SO帖子 Does CompletableFuture have a corresponding Local context?