Pineline详解

一、pipeline出现的背景:

redis客户端执行一条命令分4个过程:

  1. 发送命令-〉命令排队-〉命令执行-〉返回结果

这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题

二、pepeline的性能

1、未使用pipeline执行N条命令

Redis系列(五) Pipeline & 事务 & 客户端 - 图1

2、使用了pipeline执行N条命令

Redis系列(五) Pipeline & 事务 & 客户端 - 图2

3、两者性能对比

Redis系列(五) Pipeline & 事务 & 客户端 - 图3
小结:这是一组统计数据出来的数据,使用Pipeline执行速度比逐条执行要快,特别是客户端与服务端的网络延迟越大,性能体能越明显。
下面贴出测试代码分析两者的性能差异:

  1. @Test
  2. public void pipeCompare() {
  3. Jedis redis = new Jedis("192.168.1.111", 6379);
  4. redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码
  5. Map<String, String> data = new HashMap<String, String>();
  6. redis.select(8);//使用第8个库
  7. redis.flushDB();//清空第8个库所有数据
  8. // hmset
  9. long start = System.currentTimeMillis();
  10. // 直接hmset
  11. for (int i = 0; i < 10000; i++) {
  12. data.clear(); //清空map
  13. data.put("k_" + i, "v_" + i);
  14. redis.hmset("key_" + i, data); //循环执行10000条数据插入redis
  15. }
  16. long end = System.currentTimeMillis();
  17. System.out.println(" 共插入:[" + redis.dbSize() + "]条 .. ");
  18. System.out.println("1,未使用PIPE批量设值耗时" + (end - start) / 1000 + "秒..");
  19. redis.select(8);
  20. redis.flushDB();
  21. // 使用pipeline hmset
  22. Pipeline pipe = redis.pipelined();
  23. start = System.currentTimeMillis();
  24. //
  25. for (int i = 0; i < 10000; i++) {
  26. data.clear();
  27. data.put("k_" + i, "v_" + i);
  28. pipe.hmset("key_" + i, data); //将值封装到PIPE对象,此时并未执行,还停留在客户端
  29. }
  30. pipe.sync(); //将封装后的PIPE一次性发给redis
  31. end = System.currentTimeMillis();
  32. System.out.println(" PIPE共插入:[" + redis.dbSize() + "]条 .. ");
  33. System.out.println("2,使用PIPE批量设值耗时" + (end - start) / 1000 + "秒 ..");
  34. //--------------------------------------------------------------------------------------------------
  35. // hmget
  36. Set<String> keys = redis.keys("key_*"); //将上面设值所有结果键查询出来
  37. // 直接使用Jedis hgetall
  38. start = System.currentTimeMillis();
  39. Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
  40. for (String key : keys) {
  41. //此处keys根据以上的设值结果,共有10000个,循环10000次
  42. result.put(key, redis.hgetAll(key)); //使用redis对象根据键值去取值,将结果放入result对象
  43. }
  44. end = System.currentTimeMillis();
  45. System.out.println(" 共取值:[" + redis.dbSize() + "]条 .. ");
  46. System.out.println("3,未使用PIPE批量取值耗时 " + (end - start) / 1000 + "秒 ..");
  47. // 使用pipeline hgetall
  48. result.clear();
  49. start = System.currentTimeMillis();
  50. for (String key : keys) {
  51. pipe.hgetAll(key); //使用PIPE封装需要取值的key,此时还停留在客户端,并未真正执行查询请求
  52. }
  53. pipe.sync(); //提交到redis进行查询
  54. end = System.currentTimeMillis();
  55. System.out.println(" PIPE共取值:[" + redis.dbSize() + "]条 .. ");
  56. System.out.println("4,使用PIPE批量取值耗时" + (end - start) / 1000 + "秒 ..");
  57. redis.disconnect();
  58. }

Redis系列(五) Pipeline & 事务 & 客户端 - 图4

三、原生批命令(mset, mget)与Pipeline对比

1、原生批命令是原子性,pipeline是非原子性

(原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功要么都失败。原子操作是指你的一个业务逻辑必须是不可拆分的. 处理一件事情要么都成功,要么都失败,原子不可拆分)

2、原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性

3、原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成

四、Pipeline正确使用方式

使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。

1、Jedis中的pipeline使用方式

大家知道redis提供了mset、mget方法,但没有提供mdel方法,如果想实现,可以借助pipeline实现。

2、Jedis中的pipeline使用步骤:

  • 获取jedis对象(一般从连接池中获取)
  • 获取jedis对象的pipeline对象
  • 添加指令
  • 执行指令

测试类方法:

  1. @Test
  2. public void testCommond() {
  3. // 工具类初始化
  4. JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");
  5. for (int i = 0; i < 100; i++) {
  6. // 设值
  7. jedis.set("n" + i, String.valueOf(i));
  8. }
  9. System.out.println("keys from redis return =======" + jedis.keys("*"));
  10. }
  11. // 使用pipeline批量删除
  12. @Test
  13. public void testPipelineMdel() {
  14. // 工具类初始化
  15. JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");
  16. List<String> keys = new ArrayList<String>();
  17. for (int i = 0; i < 100; i++) {
  18. keys.add("n" + i);
  19. }
  20. jedis.mdel(keys);
  21. System.out.println("after mdel the redis return ---------" + jedis.keys("*"));
  22. }

JedisUtils下的mdel方法:

  1. /**
  2. * 删除多个字符串key 并释放连接
  3. *
  4. * @param keys*
  5. * @return 成功返回value 失败返回null
  6. */
  7. public boolean mdel(List<String> keys) {
  8. Jedis jedis = null;
  9. boolean flag = false;
  10. try {
  11. jedis = pool.getResource();//从连接借用Jedis对象
  12. Pipeline pipe = jedis.pipelined();//获取jedis对象的pipeline对象
  13. for(String key:keys){
  14. pipe.del(key); //将多个key放入pipe删除指令中
  15. }
  16. pipe.sync(); //执行命令,完全此时pipeline对象的远程调用
  17. flag = true;
  18. } catch (Exception e) {
  19. pool.returnBrokenResource(jedis);
  20. e.printStackTrace();
  21. } finally {
  22. returnResource(pool, jedis);
  23. }
  24. return flag;
  25. }

使用pipeline提交所有操作并返回执行结果:

  1. @Test
  2. public void testPipelineSyncAll() {
  3. // 工具类初始化
  4. Jedis jedis = new Jedis("192.168.1.111", 6379);
  5. jedis.auth("12345678");
  6. // 获取pipeline对象
  7. Pipeline pipe = jedis.pipelined();
  8. pipe.multi();
  9. pipe.set("name", "james"); // 调值
  10. pipe.incr("age");// 自增
  11. pipe.get("name");
  12. pipe.discard();
  13. // 将不同类型的操作命令合并提交,并将操作操作以list返回
  14. List<Object> list = pipe.syncAndReturnAll();
  15. for (Object obj : list) {
  16. // 将操作结果打印出来
  17. System.out.println(obj);
  18. }
  19. // 断开连接,释放资源
  20. jedis.disconnect();
  21. }

五、redis事务

pipeline是多条命令的组合,为了保证它的原子性,redis提供了简单的事务。

1、redis的简单事务,

一组需要一起执行的命令放到multi和exec两个命令之间,其中multi代表事务开始,exec代表事务结束。
Redis系列(五) Pipeline & 事务 & 客户端 - 图5

2、停止事务discard

Redis系列(五) Pipeline & 事务 & 客户端 - 图6

3、命令错误,语法不正确,导致事务不能正常结束

Redis系列(五) Pipeline & 事务 & 客户端 - 图7

4、运行错误,语法正确,但类型错误,事务可以正常结束

Redis系列(五) Pipeline & 事务 & 客户端 - 图8

5、watch命令:

使用watch后, multi失效,事务失效
Redis系列(五) Pipeline & 事务 & 客户端 - 图9

WATCH的机制是:在事务EXEC命令执行时,Redis会检查被WATCH的key,只有被WATCH的key从WATCH起始时至今没有发生过变更,EXEC才会被执行。如果WATCH的key在WATCH命令到EXEC命令之间发生过变化,则EXEC命令会返回失败。

小结:redis提供了简单的事务,不支持事务回滚

Redis客户端

一、Jedis基本用法

1、访问流程

连接池建立-〉访问密码设置-〉连接超时等参数设置

2、在pom文件引入相关依赖:

  1. <dependency>
  2. <groupId>redis.clients</groupId>
  3. <artifactId>jedis</artifactId>
  4. <version>2.9.0</version>
  5. </dependency>

二、使用方式

1、Jedis直接连接redis如下:

Redis系列(五) Pipeline & 事务 & 客户端 - 图10

2、使用连接池方式

Redis系列(五) Pipeline & 事务 & 客户端 - 图11
生产环境一般使用连接池进行操作,jedis连接redis对象放在连接池里,每次用的时候去POOL借用,用完后归还。

3、连接池代码

Redis系列(五) Pipeline & 事务 & 客户端 - 图12

三、redis简单使用实例

1、生产者

  1. package com.james.redis.queue1;
  2. import com.james.redis.utils.JedisUtils;
  3. public class RedisProducer {
  4. /**
  5. * jedis操作List
  6. */
  7. public static void main(String[] args){
  8. JedisUtils jedis = new JedisUtils("192.168.1.111", 6379,"12345678");
  9. for(int i = 0;i<10;i++) {
  10. jedis.lpush("informList","orderIdadb_" + i);
  11. }
  12. }
  13. }

2、消费者

  1. package com.james.redis.queue1;
  2. public class RedisConsumer {
  3. /**
  4. * jedis操作List
  5. */
  6. public static void main(String[] args){
  7. ScheduleMQ mq = new ScheduleMQ();
  8. mq.start();
  9. }
  10. }
  11. package com.james.redis.queue1;
  12. import java.util.List;
  13. import com.james.redis.utils.JedisUtils;
  14. class ScheduleMQ extends Thread {
  15. JedisUtils jedis = new JedisUtils("192.168.1.111", 6379,"12345678");
  16. @Override
  17. public void run() {
  18. while(true) {
  19. //阻塞式brpop,List中无数据时阻塞
  20. //参数0表示一直阻塞下去,直到List出现数据
  21. List<String> list = jedis.brpop(0, "informList");
  22. for(String s : list) {
  23. //处理业务逻辑
  24. System.out.println(s);
  25. }
  26. }
  27. }
  28. }

补充:JedisUtils和剩余的配置等内容可以到此下载:redis简单使用实例

四、序列化与反序列化

1、序列化工具

  1. package com.james.cache.utils;
  2. import java.io.ByteArrayInputStream;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.IOException;
  5. import java.util.List;
  6. import com.dyuproject.protostuff.LinkedBuffer;
  7. import com.dyuproject.protostuff.ProtostuffIOUtil;
  8. import com.dyuproject.protostuff.Schema;
  9. import com.dyuproject.protostuff.runtime.RuntimeSchema;
  10. /**
  11. *
  12. *
  13. <pre>
  14. * 序列化工具
  15. * </pre>
  16. *
  17. * @author James
  18. */
  19. public class SerializerUtils {
  20. public static <T> byte[] serialize(T obj) {
  21. if (obj == null) {
  22. throw new RuntimeException("序列化对象(" + obj + ")!");
  23. }
  24. @SuppressWarnings("unchecked")
  25. Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());//获得对象的类,并通过对象的类构建对应的schema
  26. LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);
  27. byte[] protostuff = null;
  28. try {
  29. protostuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
  30. } catch (Exception e) {
  31. throw new RuntimeException("序列化(" + obj.getClass() + ")对象(" + obj + ")发生异常!", e);
  32. } finally {
  33. buffer.clear();
  34. }
  35. return protostuff;
  36. }
  37. public static <T> T deserialize(byte[] paramArrayOfByte, Class<T> targetClass) {
  38. if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {
  39. throw new RuntimeException("反序列化对象发生异常,byte序列为空!");
  40. }
  41. T instance = null;
  42. try {
  43. // T message = objenesis.newInstance(cls);
  44. instance = targetClass.newInstance();
  45. } catch (InstantiationException | IllegalAccessException e) {
  46. throw new RuntimeException("反序列化过程中依据类型创建对象失败!", e);
  47. }
  48. Schema<T> schema = RuntimeSchema.getSchema(targetClass);
  49. ProtostuffIOUtil.mergeFrom(paramArrayOfByte, instance, schema);
  50. return instance;
  51. }
  52. public static <T> byte[] serializeList(List<T> objList) {
  53. if (objList == null || objList.isEmpty()) {
  54. throw new RuntimeException("序列化对象列表(" + objList + ")参数异常!");
  55. }
  56. @SuppressWarnings("unchecked")
  57. Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(objList.get(0).getClass());
  58. LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);
  59. byte[] protostuff = null;
  60. ByteArrayOutputStream bos = null;
  61. try {
  62. bos = new ByteArrayOutputStream();
  63. ProtostuffIOUtil.writeListTo(bos, objList, schema, buffer);
  64. protostuff = bos.toByteArray();
  65. } catch (Exception e) {
  66. throw new RuntimeException("序列化对象列表(" + objList + ")发生异常!", e);
  67. } finally {
  68. buffer.clear();
  69. try {
  70. if(bos!=null){
  71. bos.close();
  72. }
  73. } catch (IOException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. return protostuff;
  78. }
  79. public static <T> List<T> deserializeList(byte[] paramArrayOfByte, Class<T> targetClass) {
  80. if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {
  81. throw new RuntimeException("反序列化对象发生异常,byte序列为空!");
  82. }
  83. Schema<T> schema = RuntimeSchema.getSchema(targetClass);
  84. List<T> result = null;
  85. try {
  86. result = ProtostuffIOUtil.parseListFrom(new ByteArrayInputStream(paramArrayOfByte), schema);
  87. } catch (IOException e) {
  88. throw new RuntimeException("反序列化对象列表发生异常!",e);
  89. }
  90. return result;
  91. }
  92. }

2、实体类

  1. package com.james.cache.entity;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. public class TCountDetail implements Serializable{
  5. private String id;
  6. private String ip;
  7. private Date optime;
  8. private String username;
  9. public String getId() {
  10. return id;
  11. }
  12. public void setId(String id) {
  13. this.id = id;
  14. }
  15. public String getIp() {
  16. return ip;
  17. }
  18. public void setIp(String ip) {
  19. this.ip = ip;
  20. }
  21. public Date getOptime() {
  22. return optime;
  23. }
  24. public void setOptime(Date optime) {
  25. this.optime = optime;
  26. }
  27. public String getUsername() {
  28. return username;
  29. }
  30. public void setUsername(String username) {
  31. this.username = username;
  32. }
  33. }

3、测试类

直接调用序列化工具进行序列化即可

  1. package com.james.cache;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import org.junit.Test;
  7. import org.junit.runner.RunWith;
  8. import org.springframework.test.context.ContextConfiguration;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. import com.james.cache.entity.TCountDetail;
  11. import com.james.cache.utils.JedisUtils;
  12. import com.james.cache.utils.SerializerUtils;
  13. /*
  14. * 测试:对象序列化后存redis, 从redis取值后反序列化为JAVA对象
  15. */
  16. @RunWith(SpringJUnit4ClassRunner.class)
  17. @ContextConfiguration("classpath:applicationContext.xml")
  18. public class SerializerTest {
  19. @Test
  20. public void demo(){
  21. //工具类初始化
  22. JedisUtils jedis = new JedisUtils("192.168.1.111",6379, "12345678");
  23. //实体类TCountDetail初始化
  24. TCountDetail td = new TCountDetail();
  25. td.setId("1");
  26. td.setIp("127.0.0.1");
  27. td.setOptime(new Date());
  28. td.setUsername("james");
  29. //将键值序列化
  30. byte[] keyBytes = "user:1".getBytes();
  31. //序列化
  32. byte[] valueBytes = SerializerUtils.serialize(td);
  33. //将序列化的数据存入redis
  34. jedis.setSerializer(keyBytes, valueBytes);
  35. //从redis获取经序列化后的数据
  36. byte[] resultBytes = jedis.getSerializer(keyBytes);
  37. //反序列化,还原成对象
  38. TCountDetail obj = SerializerUtils.deserialize(resultBytes, TCountDetail.class);
  39. System.out.println("======="+obj.getUsername());
  40. }
  41. }