Pineline详解
一、pipeline出现的背景:
redis客户端执行一条命令分4个过程:
发送命令-〉命令排队-〉命令执行-〉返回结果
这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题
二、pepeline的性能
1、未使用pipeline执行N条命令

2、使用了pipeline执行N条命令

3、两者性能对比

小结:这是一组统计数据出来的数据,使用Pipeline执行速度比逐条执行要快,特别是客户端与服务端的网络延迟越大,性能体能越明显。
下面贴出测试代码分析两者的性能差异:
@Testpublic void pipeCompare() {Jedis redis = new Jedis("192.168.1.111", 6379);redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码Map<String, String> data = new HashMap<String, String>();redis.select(8);//使用第8个库redis.flushDB();//清空第8个库所有数据// hmsetlong start = System.currentTimeMillis();// 直接hmsetfor (int i = 0; i < 10000; i++) {data.clear(); //清空mapdata.put("k_" + i, "v_" + i);redis.hmset("key_" + i, data); //循环执行10000条数据插入redis}long end = System.currentTimeMillis();System.out.println(" 共插入:[" + redis.dbSize() + "]条 .. ");System.out.println("1,未使用PIPE批量设值耗时" + (end - start) / 1000 + "秒..");redis.select(8);redis.flushDB();// 使用pipeline hmsetPipeline pipe = redis.pipelined();start = System.currentTimeMillis();//for (int i = 0; i < 10000; i++) {data.clear();data.put("k_" + i, "v_" + i);pipe.hmset("key_" + i, data); //将值封装到PIPE对象,此时并未执行,还停留在客户端}pipe.sync(); //将封装后的PIPE一次性发给redisend = System.currentTimeMillis();System.out.println(" PIPE共插入:[" + redis.dbSize() + "]条 .. ");System.out.println("2,使用PIPE批量设值耗时" + (end - start) / 1000 + "秒 ..");//--------------------------------------------------------------------------------------------------// hmgetSet<String> keys = redis.keys("key_*"); //将上面设值所有结果键查询出来// 直接使用Jedis hgetallstart = System.currentTimeMillis();Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();for (String key : keys) {//此处keys根据以上的设值结果,共有10000个,循环10000次result.put(key, redis.hgetAll(key)); //使用redis对象根据键值去取值,将结果放入result对象}end = System.currentTimeMillis();System.out.println(" 共取值:[" + redis.dbSize() + "]条 .. ");System.out.println("3,未使用PIPE批量取值耗时 " + (end - start) / 1000 + "秒 ..");// 使用pipeline hgetallresult.clear();start = System.currentTimeMillis();for (String key : keys) {pipe.hgetAll(key); //使用PIPE封装需要取值的key,此时还停留在客户端,并未真正执行查询请求}pipe.sync(); //提交到redis进行查询end = System.currentTimeMillis();System.out.println(" PIPE共取值:[" + redis.dbSize() + "]条 .. ");System.out.println("4,使用PIPE批量取值耗时" + (end - start) / 1000 + "秒 ..");redis.disconnect();}

三、原生批命令(mset, mget)与Pipeline对比
1、原生批命令是原子性,pipeline是非原子性
(原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功要么都失败。原子操作是指你的一个业务逻辑必须是不可拆分的. 处理一件事情要么都成功,要么都失败,原子不可拆分)
2、原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性
3、原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成
四、Pipeline正确使用方式
使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。
1、Jedis中的pipeline使用方式
大家知道redis提供了mset、mget方法,但没有提供mdel方法,如果想实现,可以借助pipeline实现。
2、Jedis中的pipeline使用步骤:
- 获取jedis对象(一般从连接池中获取)
- 获取jedis对象的pipeline对象
- 添加指令
- 执行指令
测试类方法:
@Testpublic void testCommond() {// 工具类初始化JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");for (int i = 0; i < 100; i++) {// 设值jedis.set("n" + i, String.valueOf(i));}System.out.println("keys from redis return =======" + jedis.keys("*"));}// 使用pipeline批量删除@Testpublic void testPipelineMdel() {// 工具类初始化JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");List<String> keys = new ArrayList<String>();for (int i = 0; i < 100; i++) {keys.add("n" + i);}jedis.mdel(keys);System.out.println("after mdel the redis return ---------" + jedis.keys("*"));}
JedisUtils下的mdel方法:
/*** 删除多个字符串key 并释放连接** @param keys** @return 成功返回value 失败返回null*/public boolean mdel(List<String> keys) {Jedis jedis = null;boolean flag = false;try {jedis = pool.getResource();//从连接借用Jedis对象Pipeline pipe = jedis.pipelined();//获取jedis对象的pipeline对象for(String key:keys){pipe.del(key); //将多个key放入pipe删除指令中}pipe.sync(); //执行命令,完全此时pipeline对象的远程调用flag = true;} catch (Exception e) {pool.returnBrokenResource(jedis);e.printStackTrace();} finally {returnResource(pool, jedis);}return flag;}
使用pipeline提交所有操作并返回执行结果:
@Testpublic void testPipelineSyncAll() {// 工具类初始化Jedis jedis = new Jedis("192.168.1.111", 6379);jedis.auth("12345678");// 获取pipeline对象Pipeline pipe = jedis.pipelined();pipe.multi();pipe.set("name", "james"); // 调值pipe.incr("age");// 自增pipe.get("name");pipe.discard();// 将不同类型的操作命令合并提交,并将操作操作以list返回List<Object> list = pipe.syncAndReturnAll();for (Object obj : list) {// 将操作结果打印出来System.out.println(obj);}// 断开连接,释放资源jedis.disconnect();}
五、redis事务
pipeline是多条命令的组合,为了保证它的原子性,redis提供了简单的事务。
1、redis的简单事务,
一组需要一起执行的命令放到multi和exec两个命令之间,其中multi代表事务开始,exec代表事务结束。
2、停止事务discard

3、命令错误,语法不正确,导致事务不能正常结束

4、运行错误,语法正确,但类型错误,事务可以正常结束

5、watch命令:
使用watch后, multi失效,事务失效
WATCH的机制是:在事务EXEC命令执行时,Redis会检查被WATCH的key,只有被WATCH的key从WATCH起始时至今没有发生过变更,EXEC才会被执行。如果WATCH的key在WATCH命令到EXEC命令之间发生过变化,则EXEC命令会返回失败。
小结:redis提供了简单的事务,不支持事务回滚
Redis客户端
一、Jedis基本用法
1、访问流程
连接池建立-〉访问密码设置-〉连接超时等参数设置
2、在pom文件引入相关依赖:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>
二、使用方式
1、Jedis直接连接redis如下:

2、使用连接池方式

生产环境一般使用连接池进行操作,jedis连接redis对象放在连接池里,每次用的时候去POOL借用,用完后归还。
3、连接池代码

三、redis简单使用实例
1、生产者
package com.james.redis.queue1;import com.james.redis.utils.JedisUtils;public class RedisProducer {/*** jedis操作List*/public static void main(String[] args){JedisUtils jedis = new JedisUtils("192.168.1.111", 6379,"12345678");for(int i = 0;i<10;i++) {jedis.lpush("informList","orderIdadb_" + i);}}}
2、消费者
package com.james.redis.queue1;public class RedisConsumer {/*** jedis操作List*/public static void main(String[] args){ScheduleMQ mq = new ScheduleMQ();mq.start();}}package com.james.redis.queue1;import java.util.List;import com.james.redis.utils.JedisUtils;class ScheduleMQ extends Thread {JedisUtils jedis = new JedisUtils("192.168.1.111", 6379,"12345678");@Overridepublic void run() {while(true) {//阻塞式brpop,List中无数据时阻塞//参数0表示一直阻塞下去,直到List出现数据List<String> list = jedis.brpop(0, "informList");for(String s : list) {//处理业务逻辑System.out.println(s);}}}}
补充:JedisUtils和剩余的配置等内容可以到此下载:redis简单使用实例
四、序列化与反序列化
1、序列化工具
package com.james.cache.utils;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.List;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.Schema;import com.dyuproject.protostuff.runtime.RuntimeSchema;/****<pre>* 序列化工具* </pre>** @author James*/public class SerializerUtils {public static <T> byte[] serialize(T obj) {if (obj == null) {throw new RuntimeException("序列化对象(" + obj + ")!");}@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());//获得对象的类,并通过对象的类构建对应的schemaLinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);byte[] protostuff = null;try {protostuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer);} catch (Exception e) {throw new RuntimeException("序列化(" + obj.getClass() + ")对象(" + obj + ")发生异常!", e);} finally {buffer.clear();}return protostuff;}public static <T> T deserialize(byte[] paramArrayOfByte, Class<T> targetClass) {if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {throw new RuntimeException("反序列化对象发生异常,byte序列为空!");}T instance = null;try {// T message = objenesis.newInstance(cls);instance = targetClass.newInstance();} catch (InstantiationException | IllegalAccessException e) {throw new RuntimeException("反序列化过程中依据类型创建对象失败!", e);}Schema<T> schema = RuntimeSchema.getSchema(targetClass);ProtostuffIOUtil.mergeFrom(paramArrayOfByte, instance, schema);return instance;}public static <T> byte[] serializeList(List<T> objList) {if (objList == null || objList.isEmpty()) {throw new RuntimeException("序列化对象列表(" + objList + ")参数异常!");}@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(objList.get(0).getClass());LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);byte[] protostuff = null;ByteArrayOutputStream bos = null;try {bos = new ByteArrayOutputStream();ProtostuffIOUtil.writeListTo(bos, objList, schema, buffer);protostuff = bos.toByteArray();} catch (Exception e) {throw new RuntimeException("序列化对象列表(" + objList + ")发生异常!", e);} finally {buffer.clear();try {if(bos!=null){bos.close();}} catch (IOException e) {e.printStackTrace();}}return protostuff;}public static <T> List<T> deserializeList(byte[] paramArrayOfByte, Class<T> targetClass) {if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {throw new RuntimeException("反序列化对象发生异常,byte序列为空!");}Schema<T> schema = RuntimeSchema.getSchema(targetClass);List<T> result = null;try {result = ProtostuffIOUtil.parseListFrom(new ByteArrayInputStream(paramArrayOfByte), schema);} catch (IOException e) {throw new RuntimeException("反序列化对象列表发生异常!",e);}return result;}}
2、实体类
package com.james.cache.entity;import java.io.Serializable;import java.util.Date;public class TCountDetail implements Serializable{private String id;private String ip;private Date optime;private String username;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public Date getOptime() {return optime;}public void setOptime(Date optime) {this.optime = optime;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}}
3、测试类
直接调用序列化工具进行序列化即可
package com.james.cache;import java.io.Serializable;import java.util.Date;import java.util.HashMap;import java.util.Map;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.james.cache.entity.TCountDetail;import com.james.cache.utils.JedisUtils;import com.james.cache.utils.SerializerUtils;/** 测试:对象序列化后存redis, 从redis取值后反序列化为JAVA对象*/@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("classpath:applicationContext.xml")public class SerializerTest {@Testpublic void demo(){//工具类初始化JedisUtils jedis = new JedisUtils("192.168.1.111",6379, "12345678");//实体类TCountDetail初始化TCountDetail td = new TCountDetail();td.setId("1");td.setIp("127.0.0.1");td.setOptime(new Date());td.setUsername("james");//将键值序列化byte[] keyBytes = "user:1".getBytes();//序列化byte[] valueBytes = SerializerUtils.serialize(td);//将序列化的数据存入redisjedis.setSerializer(keyBytes, valueBytes);//从redis获取经序列化后的数据byte[] resultBytes = jedis.getSerializer(keyBytes);//反序列化,还原成对象TCountDetail obj = SerializerUtils.deserialize(resultBytes, TCountDetail.class);System.out.println("======="+obj.getUsername());}}
