一、源端
1、结构展示
1.1 外层
1.2 内层
2、PROCESS
2.1 ExecuteGroovyScript
a) 描述
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
{
"config": {
"mode": "Cluster",
"userName": "",
"password": "Dsjpt@2021",
"jedisPoolConfig": {
"maxIdle": "10",
"minIdle": "5",
"maxWaitMillis": "5000",
"soTimeout": "10000",
"maxTotal": "50",
"maxAttempts": "5"
},
"hostAndPortList": [
"10.83.68.151:6379",
"10.83.68.152:6379",
"10.83.68.153:6379",
"10.83.68.154:6379",
"10.83.68.155:6379",
"10.83.68.156:6379",
"10.83.68.157:6379",
"10.83.68.158:6379",
"10.83.68.159:6379"
]
},
"keyLocalFilePath": "/opt/dms_configs/zhoushan/redis_point.txt",
"keyList": [
"HB_GD_CLPLCF_FJ_P1_L1_001_AI0001",
"HB_GD_CLPLCF_FJ_P1_L1_001_AI0002",
"HB_GD_CLPLCF_FJ_P1_L1_001_AI0003"
]
}
二、目标端
1、结构展示
1.1 外层
1.2 内层
2、PROCESS
2.1 NifiFLow
a) 描述
-
b) PROPERTIES
2.2 ExecuteGroovyScript
a) 描述
-
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
三、附页
1、附1页
集群模式get脚本 ```groovy import groovy.json.JsonSlurper import groovy.json.JsonOutput import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets import redis.clients.jedis.JedisPoolConfig import redis.clients.jedis.JedisCluster import redis.clients.jedis.HostAndPort;
/ { “config”: { “mode”: “Cluster”, “function”: “get”, “userName”: “”, “password”: “Dsjpt@2021”, “jedisPoolConfig”: { “maxIdle”: “10”, “minIdle”: “5”, “maxWaitMillis”: “5000”, “soTimeout”: “10000”, “maxTotal”: “50”, “maxAttempts”: “5” }, “hostAndPortList”: [ “10.83.68.151:6379”, “10.83.68.152:6379”, “10.83.68.153:6379”, “10.83.68.154:6379”, “10.83.68.155:6379”, “10.83.68.156:6379”, “10.83.68.157:6379”, “10.83.68.158:6379”, “10.83.68.159:6379” ] }, “keyLocalFilePath”: “/opt/dms_configs/zhoushan/redis_point.txt”, “keyList”: [ “HB_GD_CLPLCF_FJ_P1_L1_001_AI0001” ] } /
JsonSlurper jsonSlurper = new JsonSlurper(); def jsonInfo = jsonSlurper.parseText(Info as String); JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);
List
keyList.each {key-> String val = jedisCluster.get(key); if (val != null) { writeFlowFile(key, val) } }
jedisCluster.close(); JedisCluster getJedisClusterResource(def config) { def jedisConfig = config.config.jedisPoolConfig; //创建数据库连接池 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数 jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数 jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数 jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间
// 获取集群ip列表
Set<HostAndPort> nodes = new HashSet<>();
config.config.hostAndPortList.each {
nodeStr ->
String[] hostAndPort = nodeStr.split(":");
nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
}
// 获取配置
Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);
Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);
Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);
// 获取密码
String password = config.config.password;
// 返回连接对象
return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);
}
def writeFlowFile(String key, String value) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(value.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.putAttribute(newFlowFile, ‘KEY’, key); session.transfer(newFlowFile, REL_SUCCESS); }
def writeFlowFile(String str) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(str.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.transfer(newFlowFile, REL_SUCCESS); }
List
// 创建流对象
def fileReader = new FileReader(filePath)
BufferedReader br = new BufferedReader(fileReader);
// 定义字符串,保存读取的一行文字
String key = null;
// 循环读取,读取到最后返回null
while ((key = br.readLine())!=null) {
keys.add(key);
}
fileReader.close();
// 释放资源
br.close();
return keys;
}
2、附2页
1. 集群模式set脚本
```groovy
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.HostAndPort;
/*
{
"config": {
"mode": "Cluster",
"function": "get",
"userName": "",
"password": "Dsjpt@2021",
"jedisPoolConfig": {
"maxIdle": "10",
"minIdle": "5",
"maxWaitMillis": "5000",
"soTimeout": "10000",
"maxTotal": "50",
"maxAttempts": "5"
},
"hostAndPortList": [
"10.83.68.151:6379",
"10.83.68.152:6379",
"10.83.68.153:6379",
"10.83.68.154:6379",
"10.83.68.155:6379",
"10.83.68.156:6379",
"10.83.68.157:6379",
"10.83.68.158:6379",
"10.83.68.159:6379"
]
}
}
*/
JsonSlurper jsonSlurper = new JsonSlurper();
//String json = Info;
def jsonInfo = jsonSlurper.parseText(Info as String);
JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);
def flowFileList = session.get(10000);
if (flowFileList.isEmpty()) {
session.transfer(flowFileList, REL_FAILURE);
return;
};
for (flowFile in flowFileList) {
if (flowFile != null) {
jedisCluster.set(flowFile.getAttribute('KEY'), readFlowFile(flowFile));
}
}
jedisCluster.close();
session.transfer(flowFileList, REL_SUCCESS)
JedisCluster getJedisClusterResource(def config) {
def jedisConfig = config.config.jedisPoolConfig;
//创建数据库连接池
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数
jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数
jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数
jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间
// 获取集群ip列表
Set<HostAndPort> nodes = new HashSet<>();
config.config.hostAndPortList.each {
nodeStr ->
String[] hostAndPort = nodeStr.split(":");
nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
}
// 获取配置
Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);
Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);
Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);
// 获取密码
String password = config.config.password;
// 返回连接对象
return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);
}
String readFlowFile(def flowFile) {
String content = ""
session.read(flowFile, { inputStream ->
content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
return content
}