memcache 记录
最近的一项业务当中,有N个客户端会拉取同一个topic下的消息,没5秒拉取一次,一次拉取5条,每一个的topic客户端都在在30个上下,并发相对较高,并且每条记录都只能读一次,这里在重温下memcache的一些基础技巧.
get 和 set的并发问题
业务背景,当前业务的每一条数据都只能被读取一次,仅仅需要这个数据被标记是否读取而已,并不是需要说这个数据以后还要读取
if (memcachedClient.get(key) == null) {
memcachedClient.incr.set(key,30,1)
}
get
获取数据,set
设置数据 , 通常情况下这样的处理是没有问题,但是在多个线程的并发处理当中就会有问题,在get和set的 执行空隙
中,并不是原子的,导致出现的结果是数据被多次读取。
@Test
public void testMemcacheedValue() throws InterruptedException, MemcachedException, TimeoutException {
memcachedClient.set("chen", 120, 0);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
try {
int chen = memcachedClient.get("chen");
chen += 1;//模拟业务逻辑
memcachedClient.set("chen", 120, chen);
} catch (TimeoutException | InterruptedException | MemcachedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(5000L);
Object chen = memcachedClient.get("chen");
System.out.println(chen.toString());
}
执行效果
WARN [main] (CglibAopProxy.java:262) xxx,because it is marked as final: Consider using interface-based JDK proxies instead!
WARN [main] (CglibAopProxy.java:262) -xxx, because it is marked as final: Consider using interface-based JDK proxies instead!
565 //合理的控制这里就是1000了
WARN [Xmemcached-Reactor-4] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211
WARN [Xmemcached-Reactor-5] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211
因为我们当前业务是 需要标记
,而不是需要读取数据,所以可以使用 incr
来进行数据的控制。
/**
* key 操作值
* delta 叠加值
* init 初始值,如果这个key在memcache中没有,那么返回的就是初始化的值
*/
incr(key,delta,init)
改正之后的代码
memcachedClient.set("chen1", 120, "0");
....
new Thread(() -> {
try {
memcachedClient.incr("chen1", 1);
} catch (TimeoutException | InterruptedException | MemcachedException e) {
e.printStackTrace();
}
}).start();
//改正之后的业务代码
if (memcachedClient.incr(ssss, 1, 0L) == 0) {
//业务
}
执行结果
WARN [main] (CglibAopProxy.java:262) -xxx,BeanInitializationException] because it is marked as final: Consider using interface-based JDK proxies instead!
1000
WARN [Xmemcached-Reactor-5] (MemcachedConnector.java:365) - Remove a session: 127.0.0.1:11211
cas获取锁
当前的设计是你的客户端没钱都去争抢一个锁,如果抢到了就继续执行,否则执行返回,等待后续继续拉取任务.这样的设计原因是为了当前代码更容易些,但是也带来了一个问题,就是当topic下的任务越来产生的比拉取的快,那么就需要调节这个锁了,目前需要继续观察量大不大。
- 获取cas变量
- 比较当前是否真处于锁当中,是直接返回,否进入3
- 进行cas的版本比较,如果符合进入4,否则返回
- 业务处理,最后释放锁
Note: 这里不考虑memcache挂的场景.
String sCode = String.valueOf(aLong.hashCode());
GetsResponse<Object> gets = memcachedClient.gets(sCode);
if (gets == null) {
synchronized (lock) {
memcachedClient.set(sCode, 120 * 10, 0L);
gets = memcachedClient.gets(sCode);
}
}
if (gets.getValue().equals(1L)) return;
long cas = gets.getCas();
boolean cas1 = memcachedClient.cas(sCode, 120 * 10, 1L, cas);
if (!cas1) {
return;
}
try {
//业务处理
atomicInteger.getAndAdd(nothing.size());
System.out.println("大小:" + atomicInteger.get() + "\t" + Thread.currentThread().getName() + "\t" + nothing.size());
} finally {
try {
memcachedClient.set(sCode, 120 * 10, 0L);
} catch (TimeoutException | InterruptedException | MemcachedException e) {
//重试
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
如果没有版本值的比较,那么就会有多个线程获取锁,会导致后续出错,所以当前版本值的比较是很有必要的.
junit多线程测试
- 主线程sleep住
- 使用GroboUtils进行测试
2019-03-17