memcache 记录

最近的一项业务当中,有N个客户端会拉取同一个topic下的消息,没5秒拉取一次,一次拉取5条,每一个的topic客户端都在在30个上下,并发相对较高,并且每条记录都只能读一次,这里在重温下memcache的一些基础技巧.

get 和 set的并发问题

业务背景,当前业务的每一条数据都只能被读取一次,仅仅需要这个数据被标记是否读取而已,并不是需要说这个数据以后还要读取

  1. if (memcachedClient.get(key) == null) {
  2. memcachedClient.incr.set(key,30,1)
  3. }

get 获取数据,set 设置数据 , 通常情况下这样的处理是没有问题,但是在多个线程的并发处理当中就会有问题,在get和set的 执行空隙 中,并不是原子的,导致出现的结果是数据被多次读取。

  1. @Test
  2. public void testMemcacheedValue() throws InterruptedException, MemcachedException, TimeoutException {
  3. memcachedClient.set("chen", 120, 0);
  4. for (int i = 0; i < 1000; i++) {
  5. new Thread(() -> {
  6. try {
  7. int chen = memcachedClient.get("chen");
  8. chen += 1;//模拟业务逻辑
  9. memcachedClient.set("chen", 120, chen);
  10. } catch (TimeoutException | InterruptedException | MemcachedException e) {
  11. e.printStackTrace();
  12. }
  13. }).start();
  14. }
  15. Thread.sleep(5000L);
  16. Object chen = memcachedClient.get("chen");
  17. System.out.println(chen.toString());
  18. }

执行效果

  1. WARN [main] (CglibAopProxy.java:262) xxx,because it is marked as final: Consider using interface-based JDK proxies instead!
  2. WARN [main] (CglibAopProxy.java:262) -xxx, because it is marked as final: Consider using interface-based JDK proxies instead!
  3. 565 //合理的控制这里就是1000了
  4. WARN [Xmemcached-Reactor-4] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211
  5. WARN [Xmemcached-Reactor-5] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211

因为我们当前业务是 需要标记 ,而不是需要读取数据,所以可以使用 incr 来进行数据的控制。

  1. /**
  2. * key 操作值
  3. * delta 叠加值
  4. * init 初始值,如果这个key在memcache中没有,那么返回的就是初始化的值
  5. */
  6. incr(key,delta,init)

改正之后的代码

  1. memcachedClient.set("chen1", 120, "0");
  2. ....
  3. new Thread(() -> {
  4. try {
  5. memcachedClient.incr("chen1", 1);
  6. } catch (TimeoutException | InterruptedException | MemcachedException e) {
  7. e.printStackTrace();
  8. }
  9. }).start();
  10. //改正之后的业务代码
  11. if (memcachedClient.incr(ssss, 1, 0L) == 0) {
  12. //业务
  13. }

执行结果

  1. WARN [main] (CglibAopProxy.java:262) -xxx,BeanInitializationException] because it is marked as final: Consider using interface-based JDK proxies instead!
  2. 1000
  3. WARN [Xmemcached-Reactor-5] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211

Note:Using ‘incr’ with spymemcached client

cas获取锁

当前的设计是你的客户端没钱都去争抢一个锁,如果抢到了就继续执行,否则执行返回,等待后续继续拉取任务.这样的设计原因是为了当前代码更容易些,但是也带来了一个问题,就是当topic下的任务越来产生的比拉取的快,那么就需要调节这个锁了,目前需要继续观察量大不大。

  • 获取cas变量
  • 比较当前是否真处于锁当中,是直接返回,否进入3
  • 进行cas的版本比较,如果符合进入4,否则返回
  • 业务处理,最后释放锁

Note: 这里不考虑memcache挂的场景.

  1. String sCode = String.valueOf(aLong.hashCode());
  2. GetsResponse<Object> gets = memcachedClient.gets(sCode);
  3. if (gets == null) {
  4. synchronized (lock) {
  5. memcachedClient.set(sCode, 120 * 10, 0L);
  6. gets = memcachedClient.gets(sCode);
  7. }
  8. }
  9. if (gets.getValue().equals(1L)) return;
  10. long cas = gets.getCas();
  11. boolean cas1 = memcachedClient.cas(sCode, 120 * 10, 1L, cas);
  12. if (!cas1) {
  13. return;
  14. }
  15. try {
  16. //业务处理
  17. atomicInteger.getAndAdd(nothing.size());
  18. System.out.println("大小:" + atomicInteger.get() + "\t" + Thread.currentThread().getName() + "\t" + nothing.size());
  19. } finally {
  20. try {
  21. memcachedClient.set(sCode, 120 * 10, 0L);
  22. } catch (TimeoutException | InterruptedException | MemcachedException e) {
  23. //重试
  24. }
  25. }
  26. } catch (Exception ex) {
  27. ex.printStackTrace();
  28. }

如果没有版本值的比较,那么就会有多个线程获取锁,会导致后续出错,所以当前版本值的比较是很有必要的.

junit多线程测试

  • 主线程sleep住
  • 使用GroboUtils进行测试

2019-03-17