MediatR 是参考中介者模式实现的一个 .NET 工具类库,支持在进程内以单播或多播的形式进行消息传递,通过使用 MediatR
可实现消息的发送和处理充分解耦。
在介绍 MediatR
之前,先简单了解下中介者模式。中介者模式主要是指定义一个中介对象来调度一系列对象之间的交互关系,各对象之间不需要显式的相互引用,降低耦合性。如下对比图(普通模式与中介者模式的区别):
实际上从 MediatR
源代码中可以看出,它本身也并非标准中介者模式的实现,所以这里简单了解即可。接下来将先介绍 MediatR
的两种消息传递方式的使用方式,然后再分析其具体实现。
创建一个 .NET Core Web API 项目并安装 MediatR.Extensions.Microsoft.DependencyInjection
NuGet 包(已含 MediatR
NuGet 包),然后在 ConfigureServices
中注册服务。
// 扫描 Startup 所在程序集内实现了 Handler 的对象并添加到 IoC 容器中
services.AddMediatR(typeof(Startup));
可通过查看 MediatR.Extensions.Microsoft.DependencyInjection 说明了解 AddMediatR
具体包含了哪些服务的注册以及各注册对象的生命周期,基本通过以上一行代码就已经把 MediatR
相关的服务全部注册到 IoC 容器中。
单播消息传递
单播消息传递主要涉及 IRequest
(消息类型) 和 IRequestHandler
(消息处理) 两个接口。
定义接口 IRequest
的实现类,string
指定消息处理方法的返回值类型,如下:
public class GenericRequest : IRequest<string>
{
public string Name { get; set; }
}
定义接口 IRequestHandler
的实现类,GenericRequest
指定此 Handler
要处理的消息类型,string
指定消息处理方法的返回值类型(与 IRequest
指定的泛型类型一致),另外需实现 Handle
方法,如下:
public class GenericRequestHandler : IRequestHandler<GenericRequest, string>
{
public Task<string> Handle(GenericRequest request, CancellationToken cancellationToken)
{
return Task.FromResult($"This is {request.Name}");
}
}
在 Controller 中进行调用测试:
private readonly IMediator _mediator;
public MediatorController(IMediator mediator)
{
_mediator = mediator;
}
[HttpGet]
public async Task<string> GenericRequest()
{
var result = await _mediator.Send(new GenericRequest {
Name = "GenericRequest"
});
return result;
}
另外针对不同的代码实现方式,有其他的 request-types 可选,本质上还是基于 IRequest
和 IRequestHandler
的扩展。
多播消息传递
多播消息传递主要涉及 INotification
(消息类型) 和 INotificationHandler
(消息处理) 两个接口,另外多播消息传递是无返回值的。
定义接口 INotification
的实现类,如下:
public class GenericNotification: INotification
{
public string Name { get; set; }
}
定义接口 INotificationHandler
的实现类,GenericNotification
指定此 Handler
要处理的消息类型,另外需实现 Handle
方法,这里将为此消息类型定义两个 NotificationHandler
实现类,如下:
public class GenericANotificationHandler: INotificationHandler<GenericNotification>
{
public Task Handle(GenericNotification notification, CancellationToken cancellationToken)
{
Console.WriteLine($"A {notification.Name}");
return Task.CompletedTask;
}
}
public class GenericBNotificationHandler: INotificationHandler<GenericNotification>
{
public Task Handle(GenericNotification notification, CancellationToken cancellationToken)
{
Console.WriteLine($"B {notification.Name}");
return Task.CompletedTask;
}
}
在 Controller 中进行调用测试:
[HttpGet]
public async Task GenericNotification()
{
await _mediator.Publish(new GenericNotification {
Name = "GenericNotification"
});
}
原理分析
建议阅读下源码,代码量少且结构清晰,基本理解没什么难度
通过前面的介绍可以了解在 MediatR
中面向开发者的核心接口主要是 IRequest
&IRequestHandler
、INotification
&INotificationHandler
、IMediator
。
如下 IMediator
的实现类 Mediator
中的定义:
public class Mediator: IMediator
{
private readonly ServiceFactory _serviceFactory;
private static readonly ConcurrentDictionary<Type, object> _requestHandlers = new ConcurrentDictionary<Type, object>();
private static readonly ConcurrentDictionary<Type, NotificationHandlerWrapper> _notificationHandlers = new ConcurrentDictionary<Type, NotificationHandlerWrapper>();
}
首先定义了 ServiceFactory
对象,它代表当前应用程序的 IoC 容器,在应用初始化阶段进行了注入,如 MediatR.Extensions.Microsoft.DependencyInjection
已包含了对应的 ServiceFactory 注册。由于 ServiceFactory 可自定义,所以开发中也完全可以选择其他的含 IoC 容器功能的框架,如 Autofac
、Castle Windsor
、DryIoc
等。
另外定义 _requestHandlers
和 _notificationHandlers
分别保存单播和多播消息对象类型对应的 HandlerWrapper
对象,HandlerWrapper
的主要是对 ServiceFactory
对象的传递,最终通过 ServiceFactory
从 IoC 容器中获取对应消息类型的 Handler
对象。MeidatR
还支持为单播消息定义消息处理的 Pipeline
,如通过实现 IRequestPreProcessor
、IRequestPostProcessor
在消息处理前后自定义处理行为,通过实现 IRequestExceptionHandler
、IRequestExceptionAction
在异常时自定义处理行为,这些实现类也是通过 ServiceFactory
从 IoC 容器中获取。
以下是单播消息处理的核心代码:
public override Task<TResponse> Handle(IRequest<TResponse> request, CancellationToken cancellationToken, ServiceFactory serviceFactory)
{
Task<TResponse> Handler() => GetHandler<IRequestHandler<TRequest, TResponse>>(serviceFactory).Handle((TRequest) request, cancellationToken);
return serviceFactory
.GetInstances<IPipelineBehavior<TRequest, TResponse>>()
.Reverse()
.Aggregate((RequestHandlerDelegate<TResponse>) Handler, (next, pipeline) => () => pipeline.Handle((TRequest) request, cancellationToken, next))();
}
首先从 ServiceFactory
获取 IPipelineBehavior
,然后通 Linq 的 Reverse 方法进行顺序颠倒,最后通过 Aggregate 进行委托传递并执行,所以最终执行顺序是 RequestPreProcessorBehavior
→ Handler
→ RequestPostProcessorBehavior
,这里的实现可能较难理解,核心是 Aggregate 的使用。
总结
MediatR
在实现上核心是通过保存消息请求对象与消息处理对象的关系,配合 IoC 容器实现的消息传递解耦。在实际应用中,通过 MediatR
多播消息传递可以使代码实现逻辑上更加简洁,另外也有较多的文章介绍了通过 MediatR
实现 CQRS
、EventBus
等。