代码之家  ›  专栏  ›  技术社区  ›  Ragesh P Raju

Masstransit发布中间件

  •  0
  • Ragesh P Raju  · 技术社区  · 5 年前

    MassTransit Startup.cs 文件。

    services.AddMassTransit(x =>
            {
                x.AddConsumer<CustomLogConsume>();
    
                x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });
    
                    cfg.ExchangeType = ExchangeType.Fanout;
    
                    cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
                    {
                        e.PrefetchCount = 16;
                    });
    
                    // or, configure the endpoints by convention
                    cfg.ConfigureEndpoints(provider);
                }));
            });
    

    我在我的项目解决方案中使用依赖注入来提高代码标准。发布消息可以与控制器依赖项注入配合使用。但是当我实现一个自定义 middle ware 对于日志操作,Masstransit无法正确发布消息,因此创建了一个附加队列,其中包含 _error

    public class RequestResponseLoggingMiddleware
    {
        #region Private Variables
    
        /// <summary>
        /// RequestDelegate
        /// </summary>
        private readonly RequestDelegate _next;
    
        /// <summary>
        /// IActionLogPublish
        /// </summary>
        private readonly IActionLogPublish _logPublish;
    
        #endregion
    
        #region Constructor
        public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
        {
            _next = next;
            _logPublish = logPublish;
        }
        #endregion
    
        #region PrivateMethods
    
        #region FormatRequest
        /// <summary>
        /// FormatRequest
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        private async Task<ActionLog> FormatRequest(HttpRequest request)
        {
            ActionLog actionLog = new ActionLog();
            var body = request.Body;
            request.EnableRewind();
    
            var context = request.HttpContext;
    
            var buffer = new byte[Convert.ToInt32(request.ContentLength)];
            await request.Body.ReadAsync(buffer, 0, buffer.Length);
            var bodyAsText = Encoding.UTF8.GetString(buffer);
            request.Body = body;
    
            var injectedRequestStream = new MemoryStream();
    
            var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";
    
            using (var bodyReader = new StreamReader(context.Request.Body))
            {
                bodyAsText = bodyReader.ReadToEnd();
    
                if (string.IsNullOrWhiteSpace(bodyAsText) == false)
                {
                    requestLog += $", Body : {bodyAsText}";
                }
    
                var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
                injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
                injectedRequestStream.Seek(0, SeekOrigin.Begin);
                context.Request.Body = injectedRequestStream;
            }
    
            actionLog.Request = $"{bodyAsText}";
            actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";
    
            return actionLog;
        }
        #endregion
    
        #region FormatResponse
        private async Task<string> FormatResponse(HttpResponse response)
        {
            response.Body.Seek(0, SeekOrigin.Begin);
            var text = await new StreamReader(response.Body).ReadToEndAsync();
            response.Body.Seek(0, SeekOrigin.Begin);
    
            return $"Response {text}";
        }
        #endregion
    
        #endregion
    
        #region PublicMethods
    
        #region Invoke
        /// <summary>
        /// Invoke - Hits before executing any action. Actions call executes from _next(context)
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        public async Task Invoke(HttpContext context)
        {
            ActionLog actionLog = new ActionLog();
    
            actionLog = await FormatRequest(context.Request);
    
    
            var originalBodyStream = context.Response.Body;
    
            using (var responseBody = new MemoryStream())
            {
                context.Response.Body = responseBody;
    
                await _next(context);
    
                actionLog.Response = await FormatResponse(context.Response);
    
                await _logPublish.Publish(actionLog);
                await responseBody.CopyToAsync(originalBodyStream);
            }
        }
        #endregion
    
        #endregion
    }
    

    启动时配置中间件

      public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
        {
            ............
            app.UseMiddleware<RequestResponseLoggingMiddleware>();
            ....................
        }
    

    编辑

    IActionLogPublish发布

    public interface IActionLogPublish
    {
        Task Publish(ActionLog model);
    }
    

    public class ActionLogPublish : IActionLogPublish
    {
    
        private readonly IBus _bus;
    
        public ActionLogPublish(IBus bus)
        {
            _bus = bus;
        }
    
        public async Task Publish(ActionLog actionLogData)
        {
            /* Publish values to RabbitMQ Service Bus */
    
            await _bus.Publish(actionLogData);
    
            /* Publish values to RabbitMQ Service Bus */
        }
    
    }
    

    RabbitMQ Web控制台

    enter image description here

    0 回复  |  直到 5 年前
        1
  •  2
  •   Nkosi    5 年前

    中间件需要将原始主体放回响应中。

    此外,注入的依赖关系可以很好地用于控制器,而不是中间件,因为它可能是用作用域生存期注册的。

    Invoke

    因为中间件是在应用程序启动时构建的,而不是每个请求, 范围 援引 方法的签名。这个 方法可以接受由DI填充的其他参数:

    //...omitted for brevity
    
    public RequestResponseLoggingMiddleware(RequestDelegate next) {
        _next = next;
    }
    
    //...
    
    private async Task<string> FormatResponseStream(Stream stream) {
        stream.Seek(0, SeekOrigin.Begin);
        var text = await new StreamReader(stream).ReadToEndAsync();
        stream.Seek(0, SeekOrigin.Begin);
        return $"Response {text}";
    }
    
    public async Task Invoke(HttpContext context, IActionLogPublish logger) {
        ActionLog actionLog = await FormatRequest(context.Request);
        //keep local copy of response stream
        var originalBodyStream = context.Response.Body;
    
        using (var responseBody = new MemoryStream()) {
            //replace stream for down stream calls
            context.Response.Body = responseBody;
    
            await _next(context);
    
            //put original stream back in the response object
            context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT
    
            //Copy local stream to original stream
            responseBody.Position = 0;
            await responseBody.CopyToAsync(originalBodyStream);
    
            //custom logging
            actionLog.Response = await FormatResponse(responseBody);
            await logger.Publish(actionLog);
        }
    }
    

    参考 Dependency injection in ASP.NET Core: Scoped Service lifetime

    在中间件中使用作用域服务时,将服务注入 援引 InvokeAsync 方法。 不要通过构造函数注入进行注入,因为它会强制服务像单例一样运行 Write custom ASP.NET Core middleware .

    重点矿山

        2
  •  2
  •   Nkosi    5 年前

    IBus 从每个请求创建的控制器可以正常工作,您可能需要尝试实现 IMiddleware 如本文所述的中间件中的接口 doc .

    public class RequestResponseLoggingMiddleware : IMiddleware
    {
        IActionLogPublish logPublish;
    
        public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
        {
            this.logPublish = logPublish;
        }
    
        // ...
    
        public async Task InvokeAsync(HttpContext context, RequestDelegate next)
        {
            //...
        }
    
        //...
    }
    

    在这种情况下,中间件将注册为作用域服务或临时服务,并为每个请求解析,与控制器相同。如果它与作用域服务解决方案相关,它也可以解决您的问题。