现在你已经看到了所有构成部分是如何工作的,我们可以把它们放在一起做一些有用的事情。

业务服务的执行有时会因为并发问题而失败(例如,死锁失败者)。如果操作被重试,那么它很可能在下一次尝试中成功。对于在这种情况下适合重试的业务服务(不需要回到用户那里解决冲突的 idempotent 操作),我们希望透明地重试操作,以避免客户端看到 PessimisticLockingFailureException。这是一个明显跨越服务层中多个服务的需求,因此,非常适合通过一个切面来实现。

因为我们想重试操作,所以我们需要使用 环绕通知,以便我们可以多次调用 Proceed。下面的列表显示了基本切面的实现:

  1. @Aspect
  2. public class ConcurrentOperationExecutor implements Ordered {
  3. private static final int DEFAULT_MAX_RETRIES = 2;
  4. private int maxRetries = DEFAULT_MAX_RETRIES;
  5. private int order = 1;
  6. public void setMaxRetries(int maxRetries) {
  7. this.maxRetries = maxRetries;
  8. }
  9. public int getOrder() {
  10. return this.order;
  11. }
  12. public void setOrder(int order) {
  13. this.order = order;
  14. }
  15. @Around("com.xyz.myapp.CommonPointcuts.businessService()")
  16. public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
  17. int numAttempts = 0;
  18. PessimisticLockingFailureException lockFailureException;
  19. do {
  20. numAttempts++;
  21. try {
  22. return pjp.proceed();
  23. }
  24. catch(PessimisticLockingFailureException ex) {
  25. lockFailureException = ex;
  26. }
  27. } while(numAttempts <= this.maxRetries);
  28. throw lockFailureException;
  29. }
  30. }

注意,这个切面实现了 Ordered 接口,这样我们就可以把切面的优先级设置得比事务 advice 高(我们希望每次重试都是一个新的事务)。 maxRetries 和 order 属性都是由 Spring 配置的。主要的动作发生在 advice 周围的 doConcurrentOperation 中。请注意,就目前而言,我们将重试逻辑应用于每个 businessService()。我们尝试进行,如果失败了,出现 PessimisticLockingFailureException,我们就再试一次,除非我们已经用尽了所有的重试尝试。

对应的 Spring 配置如下:

  1. <aop:aspectj-autoproxy/>
  2. <bean id="concurrentOperationExecutor" class="com.xyz.myapp.service.impl.ConcurrentOperationExecutor">
  3. <property name="maxRetries" value="3"/>
  4. <property name="order" value="100"/>
  5. </bean>

为了细化这个切面,使它只重试幂等操作,我们可以定义下面的冥等注解:

  1. @Retention(RetentionPolicy.RUNTIME)
  2. public @interface Idempotent {
  3. // marker annotation
  4. }

然后我们可以使用注解来注解服务操作的实现。对只重试冥等操作的切面的修改涉及到细化 pointcut 表达式,以便只有 @Idempotent 可以匹配,如下所示:

  1. @Around("com.xyz.myapp.CommonPointcuts.businessService() && " +
  2. "@annotation(com.xyz.myapp.service.Idempotent)")
  3. public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
  4. // ...
  5. }

下面是这个例子

切面编写

  1. package cn.mrcode.study.springdocsread.aspect;
  2. import org.aspectj.lang.ProceedingJoinPoint;
  3. import org.aspectj.lang.annotation.Around;
  4. import org.aspectj.lang.annotation.Aspect;
  5. import org.springframework.core.Ordered;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author mrcode
  9. */
  10. @Aspect
  11. @Component
  12. public class ConcurrentOperationExecutor implements Ordered {
  13. private static final int DEFAULT_MAX_RETRIES = 2;
  14. /**
  15. * 最大重试次数
  16. */
  17. private int maxRetries = DEFAULT_MAX_RETRIES;
  18. private int order = 1;
  19. public void setMaxRetries(int maxRetries) {
  20. this.maxRetries = maxRetries;
  21. }
  22. public int getOrder() {
  23. return this.order;
  24. }
  25. public void setOrder(int order) {
  26. this.order = order;
  27. }
  28. @Around("execution(* cn.mrcode.study.springdocsread.web.DemoService.*(..))" +
  29. "&& @annotation(cn.mrcode.study.springdocsread.aspect.Idempotent)")
  30. public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
  31. // 当前执行次数
  32. int numAttempts = 0;
  33. PessimisticLockingFailureException lockFailureException;
  34. do {
  35. numAttempts++;
  36. try {
  37. // 如果执行成功,则会直接返回执行的结果
  38. // 如果执行失败,则会继续走这个循环,直到达到最大重试次数
  39. return pjp.proceed();
  40. } catch (PessimisticLockingFailureException ex) {
  41. lockFailureException = ex;
  42. }
  43. System.out.println("重试次数:" + numAttempts);
  44. } while (numAttempts <= this.maxRetries);
  45. throw lockFailureException;
  46. }
  47. }

异常类

  1. package cn.mrcode.study.springdocsread.aspect;
  2. /**
  3. * @author mrcode
  4. */
  5. public class PessimisticLockingFailureException extends RuntimeException{
  6. }

冥等注解

  1. package cn.mrcode.study.springdocsread.aspect;
  2. import java.lang.annotation.Retention;
  3. import java.lang.annotation.RetentionPolicy;
  4. /**
  5. * 冥等注解标识
  6. * @author mrcode
  7. */
  8. @Retention(RetentionPolicy.RUNTIME)
  9. public @interface Idempotent {
  10. // marker annotation
  11. }

要切入点的服务类

  1. package cn.mrcode.study.springdocsread.web;
  2. import org.springframework.stereotype.Component;
  3. import cn.mrcode.study.springdocsread.aspect.Idempotent;
  4. import cn.mrcode.study.springdocsread.aspect.PessimisticLockingFailureException;
  5. /**
  6. * @author mrcode
  7. */
  8. @Component
  9. public class DemoService {
  10. @Idempotent
  11. public void test() {
  12. // 这里对当前时间进行取模,也就是说会概率性的抛出异常
  13. if (System.currentTimeMillis() % 2 == 0) {
  14. throw new PessimisticLockingFailureException();
  15. }
  16. }
  17. public void test(String name) {
  18. }
  19. }

开启 aspect 支持

  1. package cn.mrcode.study.springdocsread.web;
  2. import org.springframework.context.annotation.ComponentScan;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.context.annotation.EnableAspectJAutoProxy;
  5. /**
  6. * @author mrcode
  7. */
  8. @Configuration
  9. @ComponentScan("cn.mrcode.study.springdocsread")
  10. @EnableAspectJAutoProxy
  11. public class AppConfig {
  12. }

启动类

  1. package cn.mrcode.study.springdocsread;
  2. import org.springframework.context.annotation.AnnotationConfigApplicationContext;
  3. import cn.mrcode.study.springdocsread.web.AppConfig;
  4. import cn.mrcode.study.springdocsread.web.DemoService;
  5. public class TestDemo {
  6. public static void main(String[] args) {
  7. final AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(AppConfig.class);
  8. final DemoService bean = ctx.getBean(DemoService.class);
  9. bean.test();
  10. }
  11. }

对于那个切入点,在 idea 中有图形化的标志,有下面这个图标的方法,表示被增强了,可以点击它跳转到增强的切面中
image.png

运行这个测试方式会看到以下的情况,全部失败,最终抛出异常:

  1. 重试次数:1
  2. 重试次数:2
  3. Exception in thread "main" 重试次数:3
  4. cn.mrcode.study.springdocsread.aspect.PessimisticLockingFailureException
  5. at cn.mrcode.study.springdocsread.web.DemoService.test(DemoService.java:15)
  6. at cn.mrcode.study.springdocsread.web.DemoService$$FastClassBySpringCGLIB$$77b34426.invoke(<generated>)
  7. at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
  8. at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:783)
  9. at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
  10. at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:753)
  11. at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89)
  12. at cn.mrcode.study.springdocsread.aspect.ConcurrentOperationExecutor.doConcurrentOperation(ConcurrentOperationExecutor.java:45)
  13. ...
  14. at cn.mrcode.study.springdocsread.web.DemoService$$EnhancerBySpringCGLIB$$a20b230b.test(<generated>)
  15. at cn.mrcode.study.springdocsread.TestDemo.main(TestDemo.java:17)

想要看到被重试后,能成功的情况,就需要稍微修改下测试逻辑了

  1. @Idempotent
  2. public void test() throws InterruptedException {
  3. // 随机休眠几毫秒
  4. TimeUnit.MICROSECONDS.sleep(new Random().nextInt(10));
  5. if (System.currentTimeMillis() % 2 == 0) {
  6. throw new PessimisticLockingFailureException();
  7. }
  8. }

经过几次尝试运行后,你会看到有一次重试,这次重试后,方法就执行成功了

  1. 重试次数:1