介绍

ddd-cqrs插件模块是lzy-midway-ddd项目框架的核心模块,是实现落地领域驱动设计、命令查询职责分离与事件驱动的基石。在这个插件模块中,提供了各种基类、命令总线、查询总线、事件总线、通用仓储、工作单元、异常处理等各种支持。
在之前快速入门开发中已经演示了如何使用,这里就不再赘述。本篇文章就来讲讲这个插件模块中一些实现原理。

原理解析

订阅命令、发送命令

订阅命令是由自定义装饰器实现的,midwayjs提供了自定义装饰器的功能,有兴趣的小伙伴可以阅读一下这篇文档
先来看下订阅命令装饰器的源码:

  1. // plugins\ddd-cqrs\src\decorator\command.ts
  2. import {
  3. Scope,
  4. ScopeEnum,
  5. saveClassMetadata,
  6. saveModule,
  7. } from '@midwayjs/decorator';
  8. const MODULE_KEY = 'decorator:subscribeCommand';
  9. export function SubscribeCommand<C extends any>(cmdClazz: C): ClassDecorator {
  10. return (target: any) => {
  11. // 将装饰的类,绑定到该装饰器,用于后续能获取到 class
  12. saveModule(MODULE_KEY, target);
  13. // 保存一些元数据信息,任意你希望存的东西
  14. saveClassMetadata(
  15. MODULE_KEY,
  16. {
  17. cmdArray: cmdClazz instanceof Array ? cmdClazz : [cmdClazz],
  18. },
  19. target
  20. );
  21. // 指定 IoC 容器创建实例的作用域,这里注册为请求作用域,这样能取到 ctx
  22. Scope(ScopeEnum.Prototype)(target);
  23. };
  24. }

订阅命令装饰器的使用场景是命令执行者需要订阅某个命令才会用到,接下来演示如何使用这个装饰器:

  1. @SubscribeCommand([RegisterUserCommand]) //这里订阅了一个注册用户命令
  2. @Provide()
  3. export class UserCommandExecutor extends SuperCommandExecutor {
  4. @Inject()
  5. userChecker: IUserChecker;
  6. userRepository: CommonRepository<User>;
  7. public async init(repositoryManager: RepositoryManager): Promise<void> {
  8. this.userRepository = await repositoryManager.get(User);
  9. }
  10. public async executeCommand<C extends SuperCommand>(
  11. command: C
  12. ): Promise<void> {
  13. if (command instanceof RegisterUserCommand) {
  14. await this.registerUser(command);
  15. }else{
  16. throw new Error('未实现业务逻辑');
  17. }
  18. }
  19. private async registerUser(command: RegisterUserCommand): Promise<void> {
  20. const user = await User.create(User, command, this.userChecker);
  21. await this.userRepository.add(user);
  22. }
  23. }

接下来是将使用@SubscribeCommand装饰器订阅命令的执行者收集起来:

  1. // src/configuration.ts
  2. import {
  3. App,
  4. Configuration,
  5. getClassMetadata,
  6. Inject,
  7. listModule
  8. } from '@midwayjs/decorator';
  9. console.log('DddCqrsConfiguration');
  10. import * as mongoContext from '@lzy-plugin/mongo-context';
  11. import { join } from 'path';
  12. import { IMidwayContainer } from '@midwayjs/core';
  13. import { ICommandBus } from './core/bus/command-bus';
  14. import { Application } from 'egg';
  15. @Configuration({
  16. imports: [mongoContext],
  17. namespace: 'ddd-cqrs',
  18. importConfigs: [join(__dirname, 'config')]
  19. })
  20. export class DddCqrsConfiguration {
  21. @Inject()
  22. commandBus: ICommandBus;
  23. @Inject()
  24. eventBus: IEventBus;
  25. @Inject()
  26. queryBus: IQueryBus;
  27. @App()
  28. app: Application;
  29. async onReady(container: IMidwayContainer) {
  30. const MODULE_SUBSCRIBECOMMAND_KEY = 'decorator:subscribeCommand';
  31. // 可以获取到所有装饰了 @SubscribeCommand 装饰器的 class
  32. const commandModules = listModule(MODULE_SUBSCRIBECOMMAND_KEY);
  33. for (const mod of commandModules) {
  34. const metaData = getClassMetadata(MODULE_SUBSCRIBECOMMAND_KEY, mod);
  35. // 获取元数据
  36. if (metaData.cmdArray instanceof Array) {
  37. for (const cmdClazz of metaData.cmdArray) {
  38. this.commandBus.subscribe(cmdClazz, mod);//调用命令总线subscribe(),自动订阅命令
  39. }
  40. }
  41. }
  42. }
  43. }

登登登登~,命令总线首次登场。命令总线是实现命令请求的核心,它主要提供了两个方法,subscribe和send,前者负责订阅命令、后者负责发送命令给执行者执行。
以下是命令总线的源码:

  1. // plugins\ddd-cqrs\src\core\bus\command-bus.ts
  2. import { IMidwayApplication } from '@midwayjs/core';
  3. import { App, Init, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
  4. import { validate } from 'class-validator';
  5. import { current } from 'node-zone';
  6. import { ValidateException } from '../../exception/validate-exception';
  7. import { ISendOptions, IUnitOfWork } from '../../interface';
  8. import { RepositoryManager } from '../../repository/manager';
  9. import { SuperCommand, SuperCommandExecutor } from '../cqrs/command';
  10. import { MongoUnitOfWork } from '../other/mongo-uow';
  11. export interface ICommandBus {
  12. executorMap: Map<new () => SuperCommand, new () => SuperCommandExecutor>;
  13. /**
  14. * 订阅命令
  15. */
  16. subscribe<C extends SuperCommand, E extends SuperCommandExecutor>(
  17. commandClazz: new () => C,
  18. executorClazz: new () => E
  19. ): void;
  20. /**
  21. * 发送命令
  22. * @param command
  23. */
  24. send<C extends SuperCommand>(
  25. command: C,
  26. options: ISendOptions
  27. ): Promise<void>;
  28. index: number;
  29. }
  30. /**
  31. * 命令总线
  32. * 职责:
  33. * 1)负责管理命令执行者订阅命令
  34. * 2)发送命令给命令执行者执行
  35. */
  36. @Provide()
  37. @Scope(ScopeEnum.Singleton)
  38. export class CommandBus implements ICommandBus {
  39. index: number;
  40. @App()
  41. app: IMidwayApplication;
  42. @Init()
  43. init(): void {
  44. this.index = 0;
  45. }
  46. //收集归纳命令执行者
  47. executorMap: Map<
  48. new () => SuperCommand,
  49. new () => SuperCommandExecutor
  50. > = new Map<new () => SuperCommand, new () => SuperCommandExecutor>();
  51. //订阅命令
  52. subscribe<C extends SuperCommand, E extends SuperCommandExecutor>(
  53. commandClazz: new () => C,
  54. executorClazz: new () => E
  55. ): void {
  56. if (!this.executorMap.has(commandClazz)) {
  57. this.executorMap.set(commandClazz, executorClazz);
  58. } else {
  59. throw new Error(
  60. `命令<${commandClazz.name}>存在多个执行者[${
  61. this.executorMap.get(commandClazz).name
  62. },${executorClazz.name}]`
  63. );
  64. }
  65. }
  66. async send<C extends SuperCommand>(
  67. command: C,
  68. options: ISendOptions
  69. ): Promise<void> {
  70. //执行命令之前验证命令有效性
  71. const _validateOption = {
  72. forbidUnknownValues: true,
  73. always: true,
  74. strictGroups: true
  75. };
  76. const errors = await validate(command, _validateOption);
  77. if (errors.length > 0) {
  78. throw new ValidateException(errors);
  79. }
  80. //从Map中查找命令执行者
  81. const keys = this.executorMap.keys();
  82. const container = this.app.getApplicationContext();
  83. for (const key of keys) {
  84. if (command instanceof key) {
  85. const executorClazz = this.executorMap.get(key);
  86. const zone = current.fork(command.command_id);
  87. const uow: IUnitOfWork = await container.getAsync(MongoUnitOfWork);
  88. zone.data.uow = uow;
  89. zone.data.dbKey = options.dbKey;
  90. return new Promise((resolve, reject) => {
  91. zone.run(async () => {
  92. try {
  93. const executor = await container.getAsync(executorClazz);
  94. const repositoryManager: RepositoryManager = await container.getAsync(
  95. RepositoryManager
  96. );
  97. await executor.init(repositoryManager); //初始化命令执行者
  98. await executor.executeCommand(command); //执行命令
  99. await repositoryManager.commit(); //执行仓储持久化//执行仓储持久化
  100. await uow.commit(); //提交工作单元 即 提交事务
  101. resolve(null);
  102. } catch (e) {
  103. await uow.abort(); //回滚工作单元 即 回滚事务
  104. reject(e);
  105. }
  106. });
  107. });
  108. }
  109. }
  110. throw new Error('找不到该命令的执行者');
  111. }
  112. }

订阅查询、发送查询

订阅查询也是通过自定义装饰器实现的。在查询执行者中,使用@SubscribeQuery装饰器来订阅查询。
代码实现基本上跟实现命令请求差不多,同样会使用一个总线来订阅和发送。
查询总线的源码如下:

// plugins\ddd-cqrs\src\core\bus\query-bus.ts
import { MongoManager } from '@ykx-plugin/mongo-context';
import { IMidwayApplication } from '@midwayjs/core';
import { App, Inject, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { validate } from 'class-validator';
import { ValidateException } from '../../exception/validate-exception';
import { SuperQuery, SuperQueryExecutor } from '../cqrs/query';
import { current } from 'node-zone';
import { ISendOptions } from '../../interface';
export interface IQueryBus {
  executorMap: Map<new () => SuperQuery, new () => SuperQueryExecutor>;
  /**
   * 订阅查询
   */
  subscribe<Q extends SuperQuery, E extends SuperQueryExecutor>(
    queryClazz: new () => Q,
    executorClazz: new () => E
  ): void;
  /**
   * 执行查询
   * @param query
   */
  send<Q extends SuperQuery>(query: Q, options: ISendOptions): Promise<void>;
}

/**
 * 查询总线
 * 职责:
 * 1)负责管理查询执行者订阅查询
 * 2)转发查询,交给查询执行者们处理
 */
@Provide()
@Scope(ScopeEnum.Singleton)
export class QueryBus implements IQueryBus {
  @App()
  app: IMidwayApplication;

  @Inject('mongo-context:mongoManager')
  mongoManager: MongoManager;

  //收集归纳查询执行者
  executorMap: Map<
    new () => SuperQuery,
    new () => SuperQueryExecutor
  > = new Map<new () => SuperQuery, new () => SuperQueryExecutor>();

  //订阅查询
  subscribe<Q extends SuperQuery, E extends SuperQueryExecutor>(
    queryClazz: new () => Q,
    executorClazz: new () => E
  ): void {
    if (!this.executorMap.has(queryClazz)) {
      this.executorMap.set(queryClazz, executorClazz);
    } else {
      throw new Error(
        `查询<${queryClazz.name}>存在多个执行者[${
          this.executorMap.get(queryClazz).name
        },${executorClazz.name}]`
      );
    }
  }
  async send<Q extends SuperQuery>(
    query: Q,
    options: ISendOptions
  ): Promise<any> {
    //执行查询之前验证查询有效性
    const _validateOption = {
      forbidUnknownValues: true,
      always: true,
      strictGroups: true
    };
    const errors = await validate(query, _validateOption);
    if (errors.length > 0) {
      throw new ValidateException(errors);
    }

    const keys = this.executorMap.keys();
    const container = this.app.getApplicationContext();
    for (const key of keys) {
      if (query instanceof key) {
        const zone = current.fork(query.query_id);
        zone.data.dbKey = options.dbKey;
        return new Promise((resolve, reject) => {
          zone.run(async () => {
            try {
              const executorClazz = this.executorMap.get(key);//获取执行者类
              const executor = await container.getAsync(executorClazz);//通过容器获取执行者实例对象
              await executor.init(this.mongoManager);//初始化执行者实例对象
              const result = await executor.executeQuery(query);//执行查询
              resolve(result);//返回查询结果
            } catch (err) {
              reject(err);
            }
          });
        });
      }
    }
    throw new Error('找不到该查询的执行者');
  }
}

订阅事件、发布事件

订阅事件使用的装饰器是@SubscribeEvent,同样的也有一个总线,名为事件总线,同样的有subscribe方法,但不同于命令总线和查询总线的是,send换成了publish。事件可以有多个消费者,一般情况下,发布方不用关心谁处理事件,也不关心事件是否处理成功。
以下是事件总线的源码:

import { App, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { IMidwayApplication } from '@midwayjs/core';
import { DomainEvent, DomainEventHandler } from '../domain/event';
import { IPublishOptions } from '../../interface';

/**
 * 事件总线接口,用于处理订阅缓存和发布处理 一般用于类型来使用
 */
export interface IEventBus {
  handlerMap: Map<new () => DomainEvent, (new () => DomainEventHandler)[]>;
  subscribe<E extends DomainEvent, H extends DomainEventHandler>(
    evtClazz: new () => E,
    evtHandler: new () => H
  ): void;
  publish(evt: DomainEvent, options: IPublishOptions): Promise<void>;
}

/**
 * 事件总线,用于处理订阅缓存和发布处理 整个进程只有一个实例
 */
@Provide()
@Scope(ScopeEnum.Singleton) //作用域设置全局唯一
export class EventBus implements IEventBus {
  @App()
  app: IMidwayApplication;

  handlerMap: Map<
    new () => DomainEvent,
    (new () => DomainEventHandler)[]
  > = new Map<new () => DomainEvent, (new () => DomainEventHandler)[]>();

  public subscribe<E extends DomainEvent, H extends DomainEventHandler>(
    evtClazz: new () => E,
    evtHandler: new () => H
  ): void {
    if (!this.handlerMap.has(evtClazz)) {
      this.handlerMap.set(evtClazz, new Array<new () => DomainEventHandler>());
    }

    this.handlerMap.get(evtClazz).push(evtHandler);
  }

  public async publish(
    evt: DomainEvent,
    options: IPublishOptions
  ): Promise<void> {
    const keys = this.handlerMap.keys();
    let hasHandler = false;
    for (const key of keys) {
      if (evt instanceof key) {
        const eventHandlers = this.handlerMap.get(key);
        hasHandler = true;
        await this.execHandle(evt, eventHandlers);
        break;
      }
    }
    if (hasHandler === false) {
      console.log('找不到事件处理者');
    }
  }

  private async execHandle(
    evt: DomainEvent,
    eventHandlers: Array<new () => DomainEventHandler>
  ): Promise<void> {
    const container = this.app.getApplicationContext();
    for (const handlerClazz of eventHandlers) {
      const handler = await container.getAsync(handlerClazz);
      console.log('找到事件处理者了', handler);
      await handler.handleEvent(evt);
    }
  }
}

工作单元与事务处理

在业务中,一个命令的执行可能需要操作多个聚合的数据,这个时候就需要事务处理了。有些小伙伴就要问了,那工作单元是用来干吗的?
一般情况下,数据库都有提供事务支持。而在业务代码中,需要忘记数据库,我们不可能直接操作数据库提供的事务,所以对数据库的事务进行了封装,而这个封装就叫做工作单元。
利用工作单元,无需考虑背后使用的是什么数据库的事务,提供统一的提交事务或回滚事务。这对后期扩展其他数据库提供了支持。
以下是工作单元的源码:

import { IOptions, MongoContext, MongoUtil } from '@ykx-plugin/mongo-context';
import { Init, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { ClientSession, Connection } from 'mongoose';
import { IUnitOfWork } from '../../interface';

@Scope(ScopeEnum.Prototype)
@Provide()
export class MongoUnitOfWork implements IUnitOfWork {
  uow_id: string;

  private session: ClientSession;

  private _openTransaction: boolean;
  public get openTransaction(): boolean {
    return this._openTransaction;
  }

  constructor() {
    this.uow_id = MongoUtil.newObjectIdToString();
  }

  @Init()
  async initMethod(): Promise<void> {
    const db: Connection = MongoContext.getConnection('admin');
    if (!db) {
      throw new Error('获取数据库连接对象失败');
    }
    this.session = await db.startSession();
  }

  //开启事务
  private startTransaction(): void {
    if (!this.session) {
      throw new Error('工作单元初始化时创建事务会话失败!');
    } else {
      this.session.startTransaction();
      this._openTransaction = true;
    }
  }

  //注册事务
  register(options: IOptions): IOptions {
    if (!this.openTransaction) {
      this.startTransaction();
      console.log('事务已开启!');
    }
    const session = this.session;
    options = { session, ...options };
    return options;
  }

  //提交事务
  async commit(): Promise<void> {
    if (this._openTransaction) {
      await this.session.commitTransaction();
      this.session.endSession();
      // this.session = null;
      // this._openTransaction = false;
      console.log('事务已提交!');
    }
  }

  //回滚事务
  async abort(): Promise<void> {
    if (this._openTransaction) {
      await this.session.abortTransaction();
      this.session.endSession();
      // this.session = null;
      // this._openTransaction = false;
      console.log('事务已回滚!');
    }
  }
}

mongodb的事务比较特殊,需要每个操作的options中添加事务会话实例才能实现事务,为了不影响数据库操作方法,使用了midwayjs提供的方法切面,对数据库的操作方法进行拦截处理。

//plugins\ddd-cqrs\src\aspect\db-crud.aspect.ts
import { MongoContext } from '@ykx-plugin/mongo-context';
import { IMidwayApplication } from '@midwayjs/core';
import {
  App,
  Aspect,
  IMethodAspect,
  Init,
  // Inject,
  JoinPoint,
  Provide
} from '@midwayjs/decorator';
import { getZoneData } from '../common/util';
import { IUnitOfWork } from '../interface';

/**
 * @Aspect使用介绍
 * 文档:https://www.yuque.com/midwayjs/midway_v2/aspect
 * 第一个参数 必填 可以指定Class的方法进行切面 格式:Class | Class[]
 * 第二个参数 可选 指定对哪些方法进行切面 格式:string | ()=>boolean 参照的规范文档:https://github.com/micromatch/picomatch
 * 第三个参数 可选 执行顺序,数字越大,优先度越高 格式:number
 */
@Aspect([MongoContext], '*')
@Provide()
export class DbCrudAspect implements IMethodAspect {
  @App()
  app: IMidwayApplication;

  @Init()
  async initMethod(): Promise<void> {}

  async before(point: JoinPoint): Promise<void> {
    if (/^save|^create|^remove|^update|^get|^list/.test(point.methodName)) {
      const uow: IUnitOfWork = getZoneData('uow');
      if (uow) {
        if (/^save|^create|^remove/.test(point.methodName)) {
          //如果是增删操作,options为作为第二个参数
          point.args[1] = uow.register(point.args[1] || {});
        } else if (/^update/.test(point.methodName)) {
          //如果是改操作,options为作为第三个参数
          point.args[2] = uow.register(point.args[2] || {});
        } else if (/^get|^list/.test(point.methodName) && uow.openTransaction) {
          //如果是查操作,并且有开启事务,那接下来的查询将在事务中进行,避免改查不一致,options为作为第三个参数
          point.args[2] = uow.register(point.args[2] || {});
        }
      }
    }
  }
}

工作单元的粒度被我设计为命令级别,一个命令执行时将创建一个工作单元,命令执行完毕后,提交这个工作单元。
image.png

通用仓储

在快速开发入门文档中,我们这样新增和修改数据的:

//新增用户
const user = await User.create(User, command, this.userChecker);// 创建一个用户
await this.userRepository.add(user);// 调用用户仓储添加用户,执行命令结束后,将自动持久化。

//修改用户
const user = await this.userRepository.get(command._id);
user.changePassword(command.password); //从仓储内部实现了数据变化侦测,所以改完属性值后,命令执行完毕,仓储会自动持久化。

你也许会疑惑,新增或修改数据后是如何持久化的?接下来我们就来看一下通用仓储的源码:

import { IMongoManager, MongoContext } from '@ykx-plugin/mongo-context';
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/decorator';
import { serialize } from 'class-transformer';
import {
  getClassName,
  getZoneData,
  plainToClassAndValidate
} from '../../common/util';
import { EventBus } from '../../core/bus/event-bus';
import { AggregateRoot } from '../../core/domain/aggregate-root';
import { Entity } from '../../core/domain/entity';
import { ExceptionCodeEnum } from '../../exception/code';
import { CommonException } from '../../exception/common-exception';
import {
  IChangedItem,
  OperationTypes,
  proxyObject
} from '../core/aggregate-observer';

@Provide()
@Scope(ScopeEnum.Prototype)
export class CommonRepository<T extends AggregateRoot> {
  @Inject('mongo-context:mongoManager')
  private mongoManager: IMongoManager;

  private mongoContext: MongoContext;

  @Inject()
  private eventBus: EventBus;

  private modelClazz: new () => T;
  private changedArray: IChangedItem[];
  private serializeMap: Map<string, string>;
  private cacheArray: T[]; //缓存数据

  private deleteArray: Array<Entity>;
  private insertArray: Array<Entity>;
  private modifyArray: Array<Entity>;

  /**
   * 创建实例必须执行初始初始化方法
   * @param modelClazz
   */
  public async init(modelClazz: new () => T): Promise<void> {
    this.modelClazz = modelClazz;
    this.changedArray = new Array<IChangedItem>();
    this.cacheArray = new Array<T>();
    this.serializeMap = new Map<string, string>();
    this.deleteArray = new Array<Entity>();
    this.insertArray = new Array<Entity>();
    this.modifyArray = new Array<Entity>();
    const dbKey: string = getZoneData('dbKey');
    this.mongoContext = await this.mongoManager.getContext(dbKey);
  }

  /**
   * 通过_id从仓储查询查询数据
   * @param _id
   * @returns
   */
  public async get(_id: string): Promise<T> {
    //先在缓存区中查询是否存在,没有再从数据库找
    const isExist = this.cacheArray.find((item) => item._id === _id);
    if (isExist) {
      return isExist; //因为是直接从缓存区拿的数据,所以不需要进行代理侦测变化
    }

    //从数据库查询
    this.mongoContext.switchModel(this.modelClazz.name);
    const result = await this.mongoContext.findById(_id);
    if (!result) {
      return result;
    } else {
      const domainObject = await plainToClassAndValidate(
        this.modelClazz,
        result
      );

      //将数据进行代理侦测变化
      const proxyData = proxyObject(domainObject, this.changedArray);

      //将数据序列化,然后进行缓存
      this.serializeMap.set(proxyData._id, serialize(proxyData));

      //将数据添加到缓存区
      this.cacheArray.push(proxyData);
      return proxyData;
    }
  }

  /**
   * 添加数据到仓储中
   * @param value
   */
  public async add(value: T): Promise<void> {
    //判断缓存区或数据库存在数据,不允许add
    this.mongoContext.switchModel(this.modelClazz.name);
    if (
      this.cacheArray.find((item) => item._id === value._id) ||
      (await this.mongoContext.findById(value._id))
    ) {
      throw new CommonException(
        ExceptionCodeEnum.DB_FAIL_CREATE_UNIQUE,
        'CommonRepository检查_id发现重复,不允许添加重复数据'
      );
    }

    //不存在则先添加到缓存区,提交时直接save即可,不需要监听数据变化
    this.cacheArray.push(value);
  }

  /**
   * 从仓储中移除数据 慎重使用,聚合根最好不要删除
   * @param value
   */
  public async remove(value: Entity): Promise<void>;
  public async remove(value: string | Entity): Promise<void> {
    if (value instanceof Entity) {
      this.addDeleteItem(value);
    } else {
      const entity = await this.get(value);
      this.addDeleteItem(entity);
    }
  }

  //缓存被移除的数据
  private addDeleteItem(item: Entity): void {
    if (!(item instanceof Entity)) {
      return;
    }

    if (!this.deleteArray) {
      this.deleteArray = new Array<Entity>();
    }
    if (
      this.deleteArray.findIndex(
        (deleteItem) => deleteItem._id === item._id
      ) === -1
    ) {
      this.deleteArray.push(item);
    }
  }

  //缓存新增的数据
  private addInsertItem(item: Entity): void {
    if (!(item instanceof Entity)) {
      return;
    }

    if (!this.insertArray) {
      this.insertArray = new Array<Entity>();
    }
    if (
      this.insertArray.findIndex(
        (insertItem) => insertItem._id === item._id
      ) === -1
    ) {
      this.insertArray.push(item);
    }
  }

  //缓存修改的数据
  private addModifyItem(item: Entity): void {
    if (!(item instanceof Entity)) {
      return;
    }

    if (!this.modifyArray) {
      this.modifyArray = new Array<Entity>();
    }

    if (
      this.modifyArray.findIndex(
        (modifyItem) => modifyItem._id === item._id
      ) === -1
    ) {
      this.modifyArray.push(item);
    }
  }

  //持久化操作--保存数据
  private async executeSave(item: Entity): Promise<void> {
    for (const key in item) {
      const value = item[key];
      if (Array.isArray(value)) {
        for (const item of value) {
          await this.executeSave(item);
        }
      } else if (value instanceof Entity) {
        await this.executeSave(value);
      }
    }

    if (item instanceof Entity) {
      this.mongoContext.switchModel(getClassName(item));
      await this.mongoContext.save(item);
    }
  }

  //持久化操作--删除数据
  private async executeDelete(item: Entity): Promise<void> {
    for (const key in item) {
      const value = item[key];
      if (Array.isArray(value)) {
        for (const item of value) {
          await this.executeDelete(item);
        }
      } else if (value instanceof Entity) {
        await this.executeDelete(value);
      }
    }

    if (item instanceof Entity) {
      this.mongoContext.switchModel(getClassName(item));
      await this.mongoContext.remove({ _id: item._id });
    }
  }

  //提交,将缓存起来的被删除数据、新增数据、修改数据进行持久化
  public async commit(): Promise<void> {
    //检查更新列表中哪些实体发生删除和新增
    for (const item of this.changedArray) {
      switch (item.type) {
        case OperationTypes.ADD:
          // if (item.newValue instanceof Entity) {
          //   this.addInsertItem(item.newValue);
          // }
          break;
        case OperationTypes.DELETE:
          if (item.oldValue instanceof Entity) {
            this.addDeleteItem(item.oldValue);
          }
          break;
        case OperationTypes.SET:
          if (item.oldValue instanceof Entity) {
            this.addDeleteItem(item.oldValue);
          }

          // if (item.newValue instanceof Entity) {
          //   this.addInsertItem(item.newValue);
          // }
          break;
      }
    }

    //对比缓存数据与序列化数据的差异,存在差异则加入更细名单
    for (const item of this.cacheArray) {
      if (this.serializeMap.has(item._id)) {
        if (serialize(item) !== this.serializeMap.get(item._id)) {
          this.addModifyItem(item);
        }
      } else {
        this.addInsertItem(item);
      }
    }

    //进行持久化
    const promiseArray = [];
    this.insertArray.forEach((item) => {
      promiseArray.push(this.executeSave(item));
    });
    this.modifyArray.forEach((item) => {
      promiseArray.push(this.executeSave(item));
    });
    this.deleteArray.forEach((item) => {
      promiseArray.push(this.executeDelete(item));
    });

    //并发执行持久化
    await Promise.all(promiseArray);

    //发布领域事件
    this.cacheArray.forEach((item) => {
      item.events.forEach((event) =>
        this.eventBus.publish(event, getZoneData('dbKey'))
      );
    });

    this.init(this.modelClazz);
  }
}

通用仓储中,模仿Vue的数据变化侦测机制,使用Proxy监听数据变化,并发生变化的数据缓存到数组里面去。在变化数组中,根据场景,变化可以分为三种:新增、修改、删除。
实体内部有可能发生实体被替换,那么有可能同时触发三种变化。首先变化源对象发生修改,旧实体编码被替换成新实体编码,旧实体需要删除,新实体需要插入,也就需要执行三条持久化语句了。
注意,只有从仓储get()方法获取的聚合根实体才会被Proxy代理,进行变化侦测。而通过add()新增的聚合根实体,在commit()之前,还没有保存到数据库,所以直接视为新增。
要点:像操作对象实例那样对数据进行修改,最后由通用仓储自动进行持久化操作。

异常处理

在开发中,不推荐使用try…catch捕获异常。因为这样做,异常处理分散到了各个地方,难以管控。如果某个地方使用了try…catch阻止了报错提示,却没有对异常进行处理,一旦出现问题,很难排查。所以我们要对异常进行集中处理。这里我使用了中间件

//plugins\ddd-cqrs\src\middleware\error-handler.ts
import { App, Provide } from '@midwayjs/decorator';
import { IWebMiddleware, IMidwayWebNext } from '@midwayjs/web';
import { Application, Context } from 'egg';
import { ExceptionCodeEnum } from '../exception/code';
import { CommonException } from '../exception/common-exception';
import { ValidateException } from '../exception/validate-exception';

@Provide()
export class ErrorHandlerMiddleware implements IWebMiddleware {
  @App()
  app: Application;
  resolve() {
    return async (ctx: Context, next: IMidwayWebNext): Promise<void> => {
      try {
        await next();

        // //请求执行成功,默认添加状态码
      } catch (err) {
        // 所有的异常都在 app 上触发一个 error 事件,框架会记录一条错误日志
        ctx.app.emit('error', err, ctx);
        let exception: CommonException;
        if (err instanceof CommonException === false) {
          exception = new CommonException(
            ExceptionCodeEnum.UNKNOWN_ERR,
            err.message,
            500
          );
        } else {
          exception = err;
        }

        const error = {
          requestUrl: `${ctx.method} : ${ctx.path}`,
          code: exception.code,
          message: exception.msg,
          messageArray:
            exception instanceof ValidateException
              ? err.msgArray
              : [exception.msg],
          label: exception.label
        };

        // 从 error 对象上读出各个属性,设置到响应中
        ctx.body = error;
        ctx.status = exception.status;
      }
    };
  }
}
//主动抛出异常
throw new Error('故意抛出的异常')
throw new CommonException(ExceptionCodeEnum.UNKNOWN_ERR,'未知错误',500);
throw new ValidateException(ExceptionCodeEnum.UNKNOWN_ERR,'验证失败',500)

程序运行发生的错误或者是主动抛出的异常都会被errorHandlerMaddleware进行捕获处理,不过还没有试过在回调函数抛出异常是否能被中间件捕获异常。
在这个中间件里,我们可以对发生的错误保存成日志,也可以对接钉钉接口,发生提醒消息到钉钉等功能都可以后期慢慢扩展

通用查询

在SuperController上内置了通用查询工具属性—queryUtil,这个专门用来进行单表的通用查询。
在定义接口时,你可以这样做:

@Controller('/users')
@Provide()
export class UserController extends SuperController {
  @Get('/')

  async listUser(@Queries(ALL) queries: any): Promise<void> {
    const params = parseQuery(queries); //将参数转化为符合通查询规范的json结构
    const result = await this.queryUtil.find( //创建通用查询对象,并执行查询
      'admin',
      plainToClass(CommonQuery, params, {
        excludeExtraneousValues: true
      }),
      UserModel
    );
    this.httpHelper.success(result, 'OK');
  }
}

注意 @Queries 装饰器和 @Query 有所区别
Queries 会将相同的 key 聚合到一起,变为数组。当用户访问的接口参数为 /?name=a&name=b 时,@Queries 会返回 {name: [a, b]},而 Query 只会返回 {name: b}
调用接口示例:

const result = await createHttpRequest(app)
  .get('/users')
  .query(
    //添加query参数
    stringifyQuery({
      //将json结构转为url参数
      fields: ['phoneNumber', 'password'], //返回字段
      filter: {
        create_at: {
          $lt: Date.now(),
          $gt: Date.now() - 12 * 3600 * 1000
        },
        $or: [{ phoneNumber: { $regex: '^136' } }]
      }, //筛选参数
      options: {
        sort: {
          create_at: 'desc'
        },
        limit: 10,
        skip: 0
      } //执行参数
    })
  );

fields为字符串数组,每个字符串代表想获取的字段,但不一定能拿得到,因为后端可以设置敏感字段进行过滤。
filter为筛选参数,传参格式基本与mongoose的查询筛选参数一致
options为执行参数,有三个可选参数:sort指定参数进行排序,limit每次查询数量,skip跳过指定数量的数据,