Redis依赖项
image.png

一、源端

1、结构展示

1.1 外层

image.png

1.2 内层

image.png

2、PROCESS

2.1 ExecuteGroovyScript

a) 描述

b) SETTINGS

image.png

c) SCHEDULING

image.png

d) PROPERTIES

image.png

  1. {
  2. "config": {
  3. "mode": "Cluster",
  4. "userName": "",
  5. "password": "Dsjpt@2021",
  6. "jedisPoolConfig": {
  7. "maxIdle": "10",
  8. "minIdle": "5",
  9. "maxWaitMillis": "5000",
  10. "soTimeout": "10000",
  11. "maxTotal": "50",
  12. "maxAttempts": "5"
  13. },
  14. "hostAndPortList": [
  15. "10.83.68.151:6379",
  16. "10.83.68.152:6379",
  17. "10.83.68.153:6379",
  18. "10.83.68.154:6379",
  19. "10.83.68.155:6379",
  20. "10.83.68.156:6379",
  21. "10.83.68.157:6379",
  22. "10.83.68.158:6379",
  23. "10.83.68.159:6379"
  24. ]
  25. },
  26. "keyLocalFilePath": "/opt/dms_configs/zhoushan/redis_point.txt",
  27. "keyList": [
  28. "HB_GD_CLPLCF_FJ_P1_L1_001_AI0001",
  29. "HB_GD_CLPLCF_FJ_P1_L1_001_AI0002",
  30. "HB_GD_CLPLCF_FJ_P1_L1_001_AI0003"
  31. ]
  32. }

二、目标端

1、结构展示

1.1 外层

image.png

1.2 内层

image.png

2、PROCESS

2.1 NifiFLow

a) 描述
  1. 指定集群URL,获取夸集群传入的流文件

    b) PROPERTIES

    image.png

    2.2 ExecuteGroovyScript

    a) 描述
  2. 根据用户指定的KEY做sett操作

    b) SETTINGS

    image.png

    c) SCHEDULING

    image.png

    d) PROPERTIES

    image.png

    三、附页

    1、附1页

  3. 集群模式get脚本 ```groovy import groovy.json.JsonSlurper import groovy.json.JsonOutput import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets import redis.clients.jedis.JedisPoolConfig import redis.clients.jedis.JedisCluster import redis.clients.jedis.HostAndPort;

/ { “config”: { “mode”: “Cluster”, “function”: “get”, “userName”: “”, “password”: “Dsjpt@2021”, “jedisPoolConfig”: { “maxIdle”: “10”, “minIdle”: “5”, “maxWaitMillis”: “5000”, “soTimeout”: “10000”, “maxTotal”: “50”, “maxAttempts”: “5” }, “hostAndPortList”: [ “10.83.68.151:6379”, “10.83.68.152:6379”, “10.83.68.153:6379”, “10.83.68.154:6379”, “10.83.68.155:6379”, “10.83.68.156:6379”, “10.83.68.157:6379”, “10.83.68.158:6379”, “10.83.68.159:6379” ] }, “keyLocalFilePath”: “/opt/dms_configs/zhoushan/redis_point.txt”, “keyList”: [ “HB_GD_CLPLCF_FJ_P1_L1_001_AI0001” ] } /

JsonSlurper jsonSlurper = new JsonSlurper(); def jsonInfo = jsonSlurper.parseText(Info as String); JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);

List keyList; String filePath = jsonInfo.keyLocalFilePath; if (filePath!=null){ keyList = getContent(filePath) } else { keyList = jsonInfo.keyList }

keyList.each {key-> String val = jedisCluster.get(key); if (val != null) { writeFlowFile(key, val) } }

jedisCluster.close(); JedisCluster getJedisClusterResource(def config) { def jedisConfig = config.config.jedisPoolConfig; //创建数据库连接池 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数 jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数 jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数 jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间

  1. // 获取集群ip列表
  2. Set<HostAndPort> nodes = new HashSet<>();
  3. config.config.hostAndPortList.each {
  4. nodeStr ->
  5. String[] hostAndPort = nodeStr.split(":");
  6. nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
  7. }
  8. // 获取配置
  9. Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);
  10. Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);
  11. Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);
  12. // 获取密码
  13. String password = config.config.password;
  14. // 返回连接对象
  15. return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);

}

def writeFlowFile(String key, String value) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(value.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.putAttribute(newFlowFile, ‘KEY’, key); session.transfer(newFlowFile, REL_SUCCESS); }

def writeFlowFile(String str) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(str.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.transfer(newFlowFile, REL_SUCCESS); }

List getContent(String filePath) throws IOException { ArrayList keys = new ArrayList<>();

  1. // 创建流对象
  2. def fileReader = new FileReader(filePath)
  3. BufferedReader br = new BufferedReader(fileReader);
  4. // 定义字符串,保存读取的一行文字
  5. String key = null;
  6. // 循环读取,读取到最后返回null
  7. while ((key = br.readLine())!=null) {
  8. keys.add(key);
  9. }
  10. fileReader.close();
  11. // 释放资源
  12. br.close();
  13. return keys;

}

  1. 2、附2
  2. 1. 集群模式set脚本
  3. ```groovy
  4. import groovy.json.JsonSlurper
  5. import groovy.json.JsonOutput
  6. import org.apache.commons.io.IOUtils
  7. import java.nio.charset.StandardCharsets
  8. import redis.clients.jedis.JedisPoolConfig
  9. import redis.clients.jedis.JedisCluster
  10. import redis.clients.jedis.HostAndPort;
  11. /*
  12. {
  13. "config": {
  14. "mode": "Cluster",
  15. "function": "get",
  16. "userName": "",
  17. "password": "Dsjpt@2021",
  18. "jedisPoolConfig": {
  19. "maxIdle": "10",
  20. "minIdle": "5",
  21. "maxWaitMillis": "5000",
  22. "soTimeout": "10000",
  23. "maxTotal": "50",
  24. "maxAttempts": "5"
  25. },
  26. "hostAndPortList": [
  27. "10.83.68.151:6379",
  28. "10.83.68.152:6379",
  29. "10.83.68.153:6379",
  30. "10.83.68.154:6379",
  31. "10.83.68.155:6379",
  32. "10.83.68.156:6379",
  33. "10.83.68.157:6379",
  34. "10.83.68.158:6379",
  35. "10.83.68.159:6379"
  36. ]
  37. }
  38. }
  39. */
  40. JsonSlurper jsonSlurper = new JsonSlurper();
  41. //String json = Info;
  42. def jsonInfo = jsonSlurper.parseText(Info as String);
  43. JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);
  44. def flowFileList = session.get(10000);
  45. if (flowFileList.isEmpty()) {
  46. session.transfer(flowFileList, REL_FAILURE);
  47. return;
  48. };
  49. for (flowFile in flowFileList) {
  50. if (flowFile != null) {
  51. jedisCluster.set(flowFile.getAttribute('KEY'), readFlowFile(flowFile));
  52. }
  53. }
  54. jedisCluster.close();
  55. session.transfer(flowFileList, REL_SUCCESS)
  56. JedisCluster getJedisClusterResource(def config) {
  57. def jedisConfig = config.config.jedisPoolConfig;
  58. //创建数据库连接池
  59. JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  60. jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数
  61. jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数
  62. jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数
  63. jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间
  64. // 获取集群ip列表
  65. Set<HostAndPort> nodes = new HashSet<>();
  66. config.config.hostAndPortList.each {
  67. nodeStr ->
  68. String[] hostAndPort = nodeStr.split(":");
  69. nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
  70. }
  71. // 获取配置
  72. Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);
  73. Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);
  74. Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);
  75. // 获取密码
  76. String password = config.config.password;
  77. // 返回连接对象
  78. return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);
  79. }
  80. String readFlowFile(def flowFile) {
  81. String content = ""
  82. session.read(flowFile, { inputStream ->
  83. content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  84. } as InputStreamCallback)
  85. return content
  86. }